ai_member_xiaoxi/scripts/refresh_may_v2.py
2026-05-24 08:00:01 +08:00

244 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
刷新 5 月行课记录 v2高效批量查询 + 回填
"""
import json, subprocess, os, urllib.request
from datetime import datetime
# ===== 0. 准备 =====
secrets = {}
with open('/root/.openclaw/workspace/secrets.env') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
k, v = line.split('=', 1)
secrets[k] = v.strip('"').strip("'")
PG_PASS = secrets['PG_ONLINE_PASSWORD']
PG_HOST = 'bj-postgres-16pob4sg.sql.tencentcdb.com'
PG_PORT = '28591'
def pg_query(sql):
r = subprocess.run(['psql', '-h', PG_HOST, '-p', PG_PORT, '-U', 'ai_member',
'-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=sql, capture_output=True, text=True,
env={**os.environ, 'PGPASSWORD': PG_PASS})
return r.stdout.strip()
def get_tat():
config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json'))
app = config['apps'][0]
data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode()
req = urllib.request.Request(
'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
data=data, headers={'Content-Type': 'application/json; charset=utf-8'})
return json.loads(urllib.request.urlopen(req).read())['tenant_access_token']
TAT = get_tat()
TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac'
SHEET = '55b0eb'
# ===== 1. 读取目标行 + 销售名 =====
print("[1/4] 读取目标行数据...")
target_rows = json.load(open('/tmp/target_rows.json'))
uid_set = sorted(set(str(r[1]) for r in target_rows if r[1]), key=int)
uid_csv = ','.join(uid_set)
print(f" 644 条记录, {len(uid_set)} 去重用户")
# 批量读取 A 列(销售名)—— 一次 API 调用读整个范围
min_row = min(r[0] for r in target_rows)
max_row = max(r[0] for r in target_rows)
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TOKEN}/values/{SHEET}!A{min_row}:A{max_row}"
req = urllib.request.Request(url)
req.add_header('Authorization', f'Bearer {TAT}')
resp = json.loads(urllib.request.urlopen(req).read())
a_values = resp.get('data', {}).get('valueRange', {}).get('values', [])
sales_map = {} # row_num -> sales_name
for i, v in enumerate(a_values):
if v:
sales_map[min_row + i] = v[0]
# 构建 row_num -> (account_id, sales_name)
row_data = {}
for row_num, acc_id, phone in target_rows:
row_data[row_num] = {
'acc_id': str(acc_id),
'sales': sales_map.get(row_num, '?')
}
# ===== 2. 数据库批量查询 =====
print("[2/4] 批量查询学情...")
# 2a. 课程 + 角色
print(" 课程信息...")
course_rows = pg_query(f"""
SELECT a.id AS account_id, c.id AS user_id, d.course_level,
CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END AS course_type
FROM bi_vala_app_account a
JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL
LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL
WHERE a.id IN ({uid_csv}) AND a.status = 1 AND a.deleted_at IS NULL
ORDER BY a.id, c.id, d.course_level
""")
user_courses = {} # acc_id -> [{'user_id':..., 'level':..., 'type':...}]
all_user_ids = set()
for line in course_rows.split('\n'):
if not line: continue
parts = line.split('\t')
if len(parts) >= 4:
acc_id, uid, level, ctype = parts[0], parts[1], parts[2] or '?', parts[3] or '体验课'
all_user_ids.add(uid)
user_courses.setdefault(acc_id, []).append({'user_id': uid, 'level': level, 'type': ctype})
# 2b. 最近行课 (分表)
print(" 最近行课...")
play_records = {}
for i in range(8):
subset = [u for u in all_user_ids if int(u) % 8 == i]
if not subset: continue
sql = f"""
SELECT DISTINCT ON (user_id) user_id, created_at::date AS rd, chapter_id, chapter_unique_id
FROM bi_user_chapter_play_record_{i}
WHERE user_id IN ({','.join(subset)}) AND play_status = 1
ORDER BY user_id, created_at DESC
"""
for line in pg_query(sql).split('\n'):
if not line: continue
parts = line.split('\t')
if len(parts) >= 4:
play_records[parts[0]] = {'date': parts[1], 'ch_id': parts[2], 'ch_uid': parts[3]}
# 2c. 课程结构
print(" 课程结构...")
ch_ids = set(r['ch_id'] for r in play_records.values())
ch_map = {}
if ch_ids:
for line in pg_query(f"SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson WHERE id IN ({','.join(ch_ids)})").split('\n'):
if not line: continue
parts = line.split('\t')
if len(parts) >= 5:
ch_map[parts[0]] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}"
# 2d. 学习时长
print(" 学习时长...")
study_map = {}
for i in range(8):
subset = [u for u in all_user_ids if int(u) % 8 == i]
if not subset: continue
for line in pg_query(f"SELECT user_id, COALESCE(SUM(interval_time),0)/60000.0 FROM bi_user_component_play_record_{i} WHERE user_id IN ({','.join(subset)}) GROUP BY user_id").split('\n'):
if not line: continue
parts = line.split('\t')
if len(parts) >= 2:
study_map[parts[0]] = float(parts[1])
# 2e. 付费状态
print(" 付费状态...")
paid_set = set()
for line in pg_query(f"SELECT DISTINCT account_id FROM bi_vala_order WHERE account_id IN ({uid_csv}) AND pay_success_date IS NOT NULL AND order_status = 3").split('\n'):
if line.strip():
paid_set.add(line.strip())
print(f" 课程:{len(user_courses)} 行课:{len(play_records)} 付费:{len(paid_set)}")
# ===== 3. 组装 D 列文本 =====
print("[3/4] 组装学情文本...")
def get_best_char(acc_id):
chars = user_courses.get(acc_id, [])
if not chars:
return None, None, None
best, best_date = None, None
for c in chars:
pr = play_records.get(c['user_id'])
if pr and (best_date is None or pr['date'] > best_date):
best_date = pr['date']
best = c
if best is None:
best = chars[0]
uid = best['user_id']
pr = play_records.get(uid)
level = best['level'] if best['level'] != '?' else '?'
ctype = best['type']
if pr and pr['ch_id'] in ch_map:
current = f"{level}{ctype}-{ch_map[pr['ch_id']]}"
elif pr:
current = f"{level}{ctype}-?"
else:
current = f"{level}{ctype}-无记录"
recent = pr['date'] if pr else '无记录'
study = int(study_map.get(uid, 0))
return current, recent, study
now_str = datetime.now().strftime('%Y-%m-%d %H:%M')
updates = {}
for row_num, info in row_data.items():
acc_id = info['acc_id']
sales = info['sales']
current, recent, study = get_best_char(acc_id)
pay = '已付费' if acc_id in paid_set else '未付费'
if current:
d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent} | 学习{study}min | {pay}"
else:
d_text = f"销售:{sales} | 用户:{acc_id} | 无课程角色 | 最近行课:无记录 | 学习0min | {pay}"
updates[row_num] = d_text
# ===== 4. 批量回填 =====
print(f"[4/4] 批量回填 {len(updates)} 条...")
def api_put(tat, token, sheet, range_str, values):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values"
body = {"valueRange": {"range": f"{sheet}!{range_str}", "values": values}}
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, method='PUT')
req.add_header('Authorization', f'Bearer {tat}')
req.add_header('Content-Type', 'application/json; charset=utf-8')
try:
resp = json.loads(urllib.request.urlopen(req).read())
return resp.get('code') == 0
except Exception as e:
print(f" API error: {e}")
return False
# 按行号排序后分批每批50行连续行合并为范围
sorted_items = sorted(updates.items(), key=lambda x: x[0])
success_d = 0
success_c = 0
# 找连续行块
blocks = []
cur_block = [sorted_items[0]]
for i in range(1, len(sorted_items)):
prev_row = sorted_items[i-1][0]
cur_row = sorted_items[i][0]
if cur_row == prev_row + 1:
cur_block.append(sorted_items[i])
else:
blocks.append(cur_block)
cur_block = [sorted_items[i]]
if cur_block:
blocks.append(cur_block)
print(f"{len(blocks)} 个连续块,批量更新中...")
for bi, block in enumerate(blocks):
rows = [r for r, _ in block]
d_vals = [[v] for _, v in block]
c_vals = [["已返回"] for _ in block]
start_row, end_row = rows[0], rows[-1]
# 批量更新 D 列
if api_put(TAT, TOKEN, SHEET, f'D{start_row}:D{end_row}', d_vals):
success_d += len(block)
# 批量更新 C 列
if api_put(TAT, TOKEN, SHEET, f'C{start_row}:C{end_row}', c_vals):
success_c += len(block)
if (bi + 1) % 10 == 0:
print(f"{bi+1}/{len(blocks)}: D={success_d} C={success_c}")
print(f"\n✅ 完成D列: {success_d}/{len(updates)}C列: {success_c}/{len(updates)}")