244 lines
8.8 KiB
Python
244 lines
8.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
刷新 5 月行课记录 v2:高效批量查询 + 回填
|
||
"""
|
||
import json, subprocess, os, urllib.request
|
||
from datetime import datetime
|
||
|
||
# ===== 0. 准备 =====
|
||
secrets = {}
|
||
with open('/root/.openclaw/workspace/secrets.env') as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line and not line.startswith('#') and '=' in line:
|
||
k, v = line.split('=', 1)
|
||
secrets[k] = v.strip('"').strip("'")
|
||
PG_PASS = secrets['PG_ONLINE_PASSWORD']
|
||
PG_HOST = 'bj-postgres-16pob4sg.sql.tencentcdb.com'
|
||
PG_PORT = '28591'
|
||
|
||
def pg_query(sql):
|
||
r = subprocess.run(['psql', '-h', PG_HOST, '-p', PG_PORT, '-U', 'ai_member',
|
||
'-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=sql, capture_output=True, text=True,
|
||
env={**os.environ, 'PGPASSWORD': PG_PASS})
|
||
return r.stdout.strip()
|
||
|
||
def get_tat():
|
||
config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json'))
|
||
app = config['apps'][0]
|
||
data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode()
|
||
req = urllib.request.Request(
|
||
'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal',
|
||
data=data, headers={'Content-Type': 'application/json; charset=utf-8'})
|
||
return json.loads(urllib.request.urlopen(req).read())['tenant_access_token']
|
||
|
||
TAT = get_tat()
|
||
TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac'
|
||
SHEET = '55b0eb'
|
||
|
||
# ===== 1. 读取目标行 + 销售名 =====
|
||
print("[1/4] 读取目标行数据...")
|
||
target_rows = json.load(open('/tmp/target_rows.json'))
|
||
uid_set = sorted(set(str(r[1]) for r in target_rows if r[1]), key=int)
|
||
uid_csv = ','.join(uid_set)
|
||
print(f" 644 条记录, {len(uid_set)} 去重用户")
|
||
|
||
# 批量读取 A 列(销售名)—— 一次 API 调用读整个范围
|
||
min_row = min(r[0] for r in target_rows)
|
||
max_row = max(r[0] for r in target_rows)
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TOKEN}/values/{SHEET}!A{min_row}:A{max_row}"
|
||
req = urllib.request.Request(url)
|
||
req.add_header('Authorization', f'Bearer {TAT}')
|
||
resp = json.loads(urllib.request.urlopen(req).read())
|
||
a_values = resp.get('data', {}).get('valueRange', {}).get('values', [])
|
||
sales_map = {} # row_num -> sales_name
|
||
for i, v in enumerate(a_values):
|
||
if v:
|
||
sales_map[min_row + i] = v[0]
|
||
|
||
# 构建 row_num -> (account_id, sales_name)
|
||
row_data = {}
|
||
for row_num, acc_id, phone in target_rows:
|
||
row_data[row_num] = {
|
||
'acc_id': str(acc_id),
|
||
'sales': sales_map.get(row_num, '?')
|
||
}
|
||
|
||
# ===== 2. 数据库批量查询 =====
|
||
print("[2/4] 批量查询学情...")
|
||
|
||
# 2a. 课程 + 角色
|
||
print(" 课程信息...")
|
||
course_rows = pg_query(f"""
|
||
SELECT a.id AS account_id, c.id AS user_id, d.course_level,
|
||
CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END AS course_type
|
||
FROM bi_vala_app_account a
|
||
JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL
|
||
LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL
|
||
WHERE a.id IN ({uid_csv}) AND a.status = 1 AND a.deleted_at IS NULL
|
||
ORDER BY a.id, c.id, d.course_level
|
||
""")
|
||
|
||
user_courses = {} # acc_id -> [{'user_id':..., 'level':..., 'type':...}]
|
||
all_user_ids = set()
|
||
for line in course_rows.split('\n'):
|
||
if not line: continue
|
||
parts = line.split('\t')
|
||
if len(parts) >= 4:
|
||
acc_id, uid, level, ctype = parts[0], parts[1], parts[2] or '?', parts[3] or '体验课'
|
||
all_user_ids.add(uid)
|
||
user_courses.setdefault(acc_id, []).append({'user_id': uid, 'level': level, 'type': ctype})
|
||
|
||
# 2b. 最近行课 (分表)
|
||
print(" 最近行课...")
|
||
play_records = {}
|
||
for i in range(8):
|
||
subset = [u for u in all_user_ids if int(u) % 8 == i]
|
||
if not subset: continue
|
||
sql = f"""
|
||
SELECT DISTINCT ON (user_id) user_id, created_at::date AS rd, chapter_id, chapter_unique_id
|
||
FROM bi_user_chapter_play_record_{i}
|
||
WHERE user_id IN ({','.join(subset)}) AND play_status = 1
|
||
ORDER BY user_id, created_at DESC
|
||
"""
|
||
for line in pg_query(sql).split('\n'):
|
||
if not line: continue
|
||
parts = line.split('\t')
|
||
if len(parts) >= 4:
|
||
play_records[parts[0]] = {'date': parts[1], 'ch_id': parts[2], 'ch_uid': parts[3]}
|
||
|
||
# 2c. 课程结构
|
||
print(" 课程结构...")
|
||
ch_ids = set(r['ch_id'] for r in play_records.values())
|
||
ch_map = {}
|
||
if ch_ids:
|
||
for line in pg_query(f"SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson WHERE id IN ({','.join(ch_ids)})").split('\n'):
|
||
if not line: continue
|
||
parts = line.split('\t')
|
||
if len(parts) >= 5:
|
||
ch_map[parts[0]] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}"
|
||
|
||
# 2d. 学习时长
|
||
print(" 学习时长...")
|
||
study_map = {}
|
||
for i in range(8):
|
||
subset = [u for u in all_user_ids if int(u) % 8 == i]
|
||
if not subset: continue
|
||
for line in pg_query(f"SELECT user_id, COALESCE(SUM(interval_time),0)/60000.0 FROM bi_user_component_play_record_{i} WHERE user_id IN ({','.join(subset)}) GROUP BY user_id").split('\n'):
|
||
if not line: continue
|
||
parts = line.split('\t')
|
||
if len(parts) >= 2:
|
||
study_map[parts[0]] = float(parts[1])
|
||
|
||
# 2e. 付费状态
|
||
print(" 付费状态...")
|
||
paid_set = set()
|
||
for line in pg_query(f"SELECT DISTINCT account_id FROM bi_vala_order WHERE account_id IN ({uid_csv}) AND pay_success_date IS NOT NULL AND order_status = 3").split('\n'):
|
||
if line.strip():
|
||
paid_set.add(line.strip())
|
||
|
||
print(f" 课程:{len(user_courses)} 行课:{len(play_records)} 付费:{len(paid_set)}")
|
||
|
||
# ===== 3. 组装 D 列文本 =====
|
||
print("[3/4] 组装学情文本...")
|
||
|
||
def get_best_char(acc_id):
|
||
chars = user_courses.get(acc_id, [])
|
||
if not chars:
|
||
return None, None, None
|
||
best, best_date = None, None
|
||
for c in chars:
|
||
pr = play_records.get(c['user_id'])
|
||
if pr and (best_date is None or pr['date'] > best_date):
|
||
best_date = pr['date']
|
||
best = c
|
||
if best is None:
|
||
best = chars[0]
|
||
uid = best['user_id']
|
||
pr = play_records.get(uid)
|
||
level = best['level'] if best['level'] != '?' else '?'
|
||
ctype = best['type']
|
||
if pr and pr['ch_id'] in ch_map:
|
||
current = f"{level}{ctype}-{ch_map[pr['ch_id']]}"
|
||
elif pr:
|
||
current = f"{level}{ctype}-?"
|
||
else:
|
||
current = f"{level}{ctype}-无记录"
|
||
recent = pr['date'] if pr else '无记录'
|
||
study = int(study_map.get(uid, 0))
|
||
return current, recent, study
|
||
|
||
now_str = datetime.now().strftime('%Y-%m-%d %H:%M')
|
||
updates = {}
|
||
|
||
for row_num, info in row_data.items():
|
||
acc_id = info['acc_id']
|
||
sales = info['sales']
|
||
current, recent, study = get_best_char(acc_id)
|
||
pay = '已付费' if acc_id in paid_set else '未付费'
|
||
|
||
if current:
|
||
d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent} | 学习{study}min | {pay}"
|
||
else:
|
||
d_text = f"销售:{sales} | 用户:{acc_id} | 无课程角色 | 最近行课:无记录 | 学习0min | {pay}"
|
||
|
||
updates[row_num] = d_text
|
||
|
||
# ===== 4. 批量回填 =====
|
||
print(f"[4/4] 批量回填 {len(updates)} 条...")
|
||
|
||
def api_put(tat, token, sheet, range_str, values):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values"
|
||
body = {"valueRange": {"range": f"{sheet}!{range_str}", "values": values}}
|
||
data = json.dumps(body).encode()
|
||
req = urllib.request.Request(url, data=data, method='PUT')
|
||
req.add_header('Authorization', f'Bearer {tat}')
|
||
req.add_header('Content-Type', 'application/json; charset=utf-8')
|
||
try:
|
||
resp = json.loads(urllib.request.urlopen(req).read())
|
||
return resp.get('code') == 0
|
||
except Exception as e:
|
||
print(f" API error: {e}")
|
||
return False
|
||
|
||
# 按行号排序后分批(每批50行,连续行合并为范围)
|
||
sorted_items = sorted(updates.items(), key=lambda x: x[0])
|
||
success_d = 0
|
||
success_c = 0
|
||
|
||
# 找连续行块
|
||
blocks = []
|
||
cur_block = [sorted_items[0]]
|
||
for i in range(1, len(sorted_items)):
|
||
prev_row = sorted_items[i-1][0]
|
||
cur_row = sorted_items[i][0]
|
||
if cur_row == prev_row + 1:
|
||
cur_block.append(sorted_items[i])
|
||
else:
|
||
blocks.append(cur_block)
|
||
cur_block = [sorted_items[i]]
|
||
if cur_block:
|
||
blocks.append(cur_block)
|
||
|
||
print(f" 共 {len(blocks)} 个连续块,批量更新中...")
|
||
|
||
for bi, block in enumerate(blocks):
|
||
rows = [r for r, _ in block]
|
||
d_vals = [[v] for _, v in block]
|
||
c_vals = [["已返回"] for _ in block]
|
||
|
||
start_row, end_row = rows[0], rows[-1]
|
||
|
||
# 批量更新 D 列
|
||
if api_put(TAT, TOKEN, SHEET, f'D{start_row}:D{end_row}', d_vals):
|
||
success_d += len(block)
|
||
|
||
# 批量更新 C 列
|
||
if api_put(TAT, TOKEN, SHEET, f'C{start_row}:C{end_row}', c_vals):
|
||
success_c += len(block)
|
||
|
||
if (bi + 1) % 10 == 0:
|
||
print(f" 块 {bi+1}/{len(blocks)}: D={success_d} C={success_c}")
|
||
|
||
print(f"\n✅ 完成!D列: {success_d}/{len(updates)},C列: {success_c}/{len(updates)}")
|