#!/usr/bin/env python3 """ 刷新 5 月行课记录:查询学情数据并回填 Sheet1 D 列 """ import json import subprocess import sys import os from datetime import datetime # ========== 1. 读取目标行 ========== target_rows = json.load(open('/tmp/target_rows.json')) print(f"[1/4] 读取目标行: {len(target_rows)} 条") # 提取 account_id 列表 uid_set = sorted(set(str(r[1]) for r in target_rows if r[1]), key=int) print(f" 去重用户数: {len(uid_set)}") uid_csv = ','.join(uid_set) # ========== 2. 数据库查询 ========== print(f"[2/4] 查询学情数据...") # 从 secrets.env 获取密码 secrets = {} with open('/root/.openclaw/workspace/secrets.env') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: k, v = line.split('=', 1) secrets[k] = v.strip('"').strip("'") pg_pass = secrets.get('PG_ONLINE_PASSWORD', '') pg_host = 'bj-postgres-16pob4sg.sql.tencentcdb.com' pg_port = '28591' # 2a. 课程信息 + 角色ID print(" 查询课程信息...") course_sql = f""" SELECT a.id AS account_id, c.id AS user_id, d.course_level, CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END AS course_type FROM bi_vala_app_account a JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL WHERE a.id IN ({uid_csv}) AND a.status = 1 AND a.deleted_at IS NULL ORDER BY a.id, c.id, d.course_level """ result = subprocess.run( ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=course_sql, capture_output=True, text=True, env={**os.environ, 'PGPASSWORD': pg_pass} ) # 构建 account_id -> {user_id列表, 课程信息} 映射 user_courses = {} # account_id -> [{'user_id': ..., 'level': ..., 'type': ...}, ...] for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) >= 4: acc_id, user_id, level, ctype = parts[0], parts[1], parts[2], parts[3] if acc_id not in user_courses: user_courses[acc_id] = [] user_courses[acc_id].append({ 'user_id': user_id, 'level': level, 'type': ctype }) print(f" 课程信息: {len(user_courses)} 个账号有角色") # 2b. 最近行课记录 (分表查询) print(" 查询最近行课记录...") # 收集所有 user_id all_user_ids = set() for acc_id, chars in user_courses.items(): for c in chars: all_user_ids.add(c['user_id']) # 分表查询 play_records = {} # user_id -> {'recent_date': ..., 'chapter_id': ..., 'chapter_unique_id': ...} tables = [f'bi_user_chapter_play_record_{i}' for i in range(8)] for table in tables: uid_subset = [u for u in all_user_ids if int(u) % 8 == int(table[-1])] if not uid_subset: continue uid_csv_sub = ','.join(uid_subset) sql = f""" SELECT DISTINCT ON (user_id) user_id, created_at::date AS recent_date, chapter_id, chapter_unique_id FROM {table} WHERE user_id IN ({uid_csv_sub}) AND play_status = 1 ORDER BY user_id, created_at DESC """ result = subprocess.run( ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=sql, capture_output=True, text=True, env={**os.environ, 'PGPASSWORD': pg_pass} ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) >= 4: play_records[parts[0]] = { 'recent_date': parts[1], 'chapter_id': parts[2], 'chapter_unique_id': parts[3] } print(f" 行课记录: {len(play_records)} 个角色有行课") # 2c. 课程结构映射 (chapter_id -> 课程名称) print(" 查询课程结构...") chapter_ids = set(r['chapter_id'] for r in play_records.values()) chapter_map = {} if chapter_ids: ch_csv = ','.join(chapter_ids) ch_sql = f""" SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson WHERE id IN ({ch_csv}) """ result = subprocess.run( ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=ch_sql, capture_output=True, text=True, env={**os.environ, 'PGPASSWORD': pg_pass} ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) >= 5: ch_id = parts[0] chapter_map[ch_id] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}" # 2d. 学习时长 print(" 查询学习时长...") study_times = {} # user_id -> total_minutes comp_tables = [f'bi_user_component_play_record_{i}' for i in range(8)] for table in comp_tables: uid_subset = [u for u in all_user_ids if int(u) % 8 == int(table[-1])] if not uid_subset: continue uid_csv_sub = ','.join(uid_subset) sql = f""" SELECT user_id, COALESCE(SUM(interval_time), 0) / 60000.0 AS total_min FROM {table} WHERE user_id IN ({uid_csv_sub}) GROUP BY user_id """ result = subprocess.run( ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=sql, capture_output=True, text=True, env={**os.environ, 'PGPASSWORD': pg_pass} ) for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) >= 2: study_times[parts[0]] = float(parts[1]) # 2e. 付费状态 print(" 查询付费状态...") pay_sql = f""" SELECT account_id, CASE WHEN COUNT(*) > 0 THEN '已付费' ELSE '未付费' END AS pay_status FROM bi_vala_order WHERE account_id IN ({uid_csv}) AND pay_success_date IS NOT NULL AND order_status = 3 GROUP BY account_id """ result = subprocess.run( ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=pay_sql, capture_output=True, text=True, env={**os.environ, 'PGPASSWORD': pg_pass} ) paid_users = set() for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split('\t') if len(parts) >= 2 and parts[1] == '已付费': paid_users.add(parts[0]) # 查询所有用户(包括未付费的) all_paid = set() for acc_id in uid_set: if acc_id in paid_users: all_paid.add(acc_id) print(f" 付费用户: {len(paid_users)}") # ========== 3. 组装学情数据 ========== print(f"[3/4] 组装学情数据...") def get_course_info(acc_id): """获取用户最优先的课程信息""" chars = user_courses.get(acc_id, []) if not chars: return None, None, None # 选最近有过行课记录的角色 best_char = None best_date = None for c in chars: uid = c['user_id'] if uid in play_records: d = play_records[uid]['recent_date'] if best_date is None or d > best_date: best_date = d best_char = c if best_char is None and chars: best_char = chars[0] # 回退到第一个角色 if best_char is None: return None, None, None uid = best_char['user_id'] level = best_char['level'] or '?' ctype = best_char['type'] or '体验课' # 获取当前进度 pr = play_records.get(uid) if pr and pr['chapter_id'] in chapter_map: chapter_name = chapter_map[pr['chapter_id']] current = f"{level}{ctype}-{chapter_name}" else: current = f"{level}{ctype}-?" recent_date = pr['recent_date'] if pr else '无记录' study_min = study_times.get(uid, 0) return current, recent_date, int(study_min) # 为每个目标行生成 D 列文本 now_str = datetime.now().strftime('%Y-%m-%d %H:%M') updates = [] # [(row_num, d_text)] for row_num, acc_id, phone in target_rows: acc_id = str(acc_id) current, recent_date, study_min = get_course_info(acc_id) pay_status = '已付费' if acc_id in paid_users else '未付费' # 从原始表取销售名(通过B列用户ID) # 销售名需要从原始表A列获取,但这里我们直接用target_rows中没有销售名 # 实际上我们需要重新读取A列。为了效率,暂时用"用户"占位 # 让我从之前的扫描中保留销售名... # 实际上 target_rows 目前只有 (row_num, acc_id, phone),没有 sales_name # 需要补读销售名列 # 先临时处理,后续补全 if current is None: d_text = f"用户:{acc_id} | 未匹配到课程信息 | 最近行课:无记录 | 学习0min | {pay_status}" else: d_text = f"用户:{acc_id} | 当前:{current} | 最近行课:{recent_date} | 学习{study_min}min | {pay_status}" updates.append((row_num, d_text, pay_status, recent_date, current)) print(f" 生成 {len(updates)} 条更新") # ========== 需要补读A列(销售名)========== # 从 target_rows 中批量读取A列 print(" 补读销售名列...") import urllib.request # 获取 TAT config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json')) app = config['apps'][0] tat_data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode() tat_req = urllib.request.Request( 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', data=tat_data, headers={'Content-Type': 'application/json; charset=utf-8'}) tat = json.loads(urllib.request.urlopen(tat_req).read())['tenant_access_token'] # 读取所有目标行的 A 列 TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac' SHEET = '55b0eb' # 按行号排序 target_rows_sorted = sorted(updates, key=lambda x: x[0]) sales_map = {} # row_num -> sales_name # 分批读取A列 batch_size = 200 for i in range(0, len(target_rows_sorted), batch_size): batch = target_rows_sorted[i:i+batch_size] row_nums = [str(r[0]) for r in batch] range_str = f"{SHEET}!A{min(row_nums, key=int)}:A{max(row_nums, key=int)}" url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TOKEN}/values/{range_str}" req = urllib.request.Request(url) req.add_header('Authorization', f'Bearer {tat}') try: resp = json.loads(urllib.request.urlopen(req).read()) values = resp.get('data', {}).get('valueRange', {}).get('values', []) start_row = int(min(row_nums, key=int)) for j, v in enumerate(values): row_num = start_row + j if v: sales_map[row_num] = v[0] except Exception as e: print(f" Error reading A col: {e}") print(f" 销售名: {len(sales_map)} 条") # 重新组装 D 列文本(加入销售名) final_updates = [] for row_num, d_text, pay_status, recent_date, current in target_rows_sorted: sales = sales_map.get(row_num, '?') # 从原始 target_rows 中找到这个 row_num 对应的 acc_id orig = next((r for r in target_rows if r[0] == row_num), None) acc_id = str(orig[1]) if orig else '?' if current and current != 'None': d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent_date} | 学习{study_min}min | {pay_status}" else: # 需要重新计算 current2, recent_date2, study_min2 = get_course_info(acc_id) pay_status2 = '已付费' if acc_id in paid_users else '未付费' if current2: d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current2} | 最近行课:{recent_date2} | 学习{study_min2}min | {pay_status2}" else: d_text = f"销售:{sales} | 用户:{acc_id} | 未匹配到课程信息 | 最近行课:无记录 | 学习0min | {pay_status2}" final_updates.append((row_num, d_text)) updates = final_updates # ========== 4. 批量回填 ========== print(f"[4/4] 批量回填 {len(updates)} 条...") # 构建批量更新请求 # 飞书 API 支持范围更新,但 D 列不连续(有大量跳行) # 策略:逐行更新,每50行一批 def update_cell(tat, token, sheet, row, col, value): """更新单个单元格""" range_str = f"{sheet}!{col}{row}:{col}{row}" url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values" body = json.dumps({ "valueRange": { "range": range_str, "values": [[value]] } }).encode() req = urllib.request.Request(url, data=body, method='PUT') req.add_header('Authorization', f'Bearer {tat}') req.add_header('Content-Type', 'application/json; charset=utf-8') resp = json.loads(urllib.request.urlopen(req).read()) return resp.get('code') == 0 # 分批处理:每50个一批,D列和C列一起更新 batch_size = 50 success_d = 0 success_c = 0 for i in range(0, len(updates), batch_size): batch = updates[i:i+batch_size] # 收集 D 列和 C 列的更新 # 由于行不连续,需要逐行更新 for row_num, d_text in batch: # 更新 D 列 if update_cell(tat, TOKEN, SHEET, row_num, 'D', d_text): success_d += 1 # 更新 C 列为"已返回" if update_cell(tat, TOKEN, SHEET, row_num, 'C', '已返回'): success_c += 1 print(f" 进度: {min(i+batch_size, len(updates))}/{len(updates)} (D:{success_d}, C:{success_c})") print(f"\n✅ 完成!D列更新: {success_d}/{len(updates)},C列更新: {success_c}/{len(updates)}")