diff --git a/scripts/batch_update_sheet.py b/scripts/batch_update_sheet.py new file mode 100644 index 0000000..619600c --- /dev/null +++ b/scripts/batch_update_sheet.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python3 +""" +批量更新飞书电子表格:回填用户ID和匹配状态 +用于 2DOxEI 表 R915-R992 行 +""" +import json +import subprocess +import sys + +# 78个手机号(按表格行顺序 915-992) +PHONES = [ + '18898596908', '13104122113', '18616818587', '18600641856', '18527822530', + '15301808320', '15921183656', '18969141986', '18853077186', '19879837192', + '15013730773', '18240307314', '15133168361', '18607715299', '18640248566', + '15229999262', '18615767595', '15880070471', '15210946014', '13416197660', + '18030731125', '13372561305', '13438029626', '13426271919', '13380241801', + '13331090268', '13941957202', '13668236095', '18666339866', '18193473383', + '18719069856', '15092617699', '15602091300', '18906300189', '18823116345', + '16675181845', '15805920790', '13631576638', '13825629898', '18689550023', + '13858852527', '17701557793', '18800105821', '18243588666', '15070812805', + '15906585627', '13818184885', '18609909747', '18501055123', '18781333078', + '13510511993', '13763607518', '19131773001', '13429170125', '13548532992', + '18273336778', '15004066188', '15386183750', '15002087823', '15622866383', + '15236831122', '13980065537', '17351768736', '13752368975', '18988791586', + '13465563287', '18268989827', '18358369704', '13370181982', '15062199752', + '13009161168', '17701717015', '13969392995', '13560010506', '18042928605', + '13603503266', '15776824932', '18131135363', +] + +# 数据库查询结果: 脱敏手机号 -> [account_id列表] +# rn=1 作为首选(最大account_id) +MATCHES = {} +raw_matches = [ + ('188****6908', 26655), ('131****2113', 27442), ('186****8587', 27410), + ('186****1856', 27213), ('185****2530', 27227), ('153****8320', 27417), + ('159****3656', 27316), ('189****1986', 27591), ('188****7186', 27248), + ('198****7192', 27686), ('150****0773', 27464), ('182****7314', 27429), + ('151****8361', 27432), ('186****5299', 27017), ('186****8566', 27630), + ('152****9262', 27483), ('186****7595', 27467), ('158****0471', 27693), + ('152****6014', 27490), ('134****7660', 27619), ('180****1125', 27618), + ('133****1305', 26920), ('134****9626', 27583), ('134****1919', 27755), + ('133****1801', 27633), ('133****0268', 27983), ('139****7202', 27499), + ('136****6095', 27598), ('186****9866', 27745), ('181****3383', 27576), + ('187****9856', 27585), ('150****7699', 28052), ('156****1300', 27672), + ('189****0189', 27292), ('188****6345', 27836), ('166****1845', 27955), + ('158****0790', 27951), ('136****6638', 5149), ('138****9898', 3612), + ('186****0023', 27674), ('138****2527', 4882), ('177****7793', 27890), + ('188****5821', 27761), ('182****8666', 27812), ('150****2805', 27813), + ('159****5627', 27882), ('138****4885', 27911), ('186****9747', 27809), + ('185****5123', 27776), ('187****3078', 27738), ('135****1993', 27806), + ('137****7518', 27770), ('191****3001', 27638), ('134****0125', 27794), + ('135****2992', 27750), ('182****6778', 27728), ('150****6188', 27803), + ('153****3750', 27790), ('150****7823', 11807), ('156****6383', 28018), + ('152****1122', 27869), ('139****5537', 27947), ('173****8736', 28070), + ('137****8975', 27916), ('189****1586', 27941), ('134****3287', 28079), + ('182****9827', 27860), ('183****9704', 27849), ('133****1982', 4923), + ('150****9752', 28071), ('130****1168', 27936), ('177****7015', 27946), + ('139****2995', 27889), ('135****0506', 28394), ('180****8605', 28011), + ('136****3266', 27965), ('157****4932', 28088), ('181****5363', 27979), +] + +for tel_masked, acc_id in raw_matches: + MATCHES[tel_masked] = acc_id # rn=1 优先,后出现的会覆盖 + +def mask_phone(phone): + """脱敏手机号: 前3位 + **** + 后4位""" + return f"{phone[:3]}****{phone[-4:]}" + +def get_tat(): + """获取 Tenant Access Token""" + import json + config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json')) + app = config['apps'][0] + import urllib.request + data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode() + req = urllib.request.Request( + 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', + data=data, + headers={'Content-Type': 'application/json; charset=utf-8'} + ) + resp = json.loads(urllib.request.urlopen(req).read()) + return resp['tenant_access_token'] + +def update_sheet_range(tat, token, sheet_id, range_str, values): + """批量更新电子表格范围""" + import urllib.request + url = f'https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values' + body = json.dumps({ + "valueRange": { + "range": f"{sheet_id}!{range_str}", + "values": values + } + }).encode() + req = urllib.request.Request(url, data=body, method='PUT') + req.add_header('Authorization', f'Bearer {tat}') + req.add_header('Content-Type', 'application/json; charset=utf-8') + resp = json.loads(urllib.request.urlopen(req).read()) + return resp + +if __name__ == '__main__': + # 构建回填数据 + f_values = [] # F列: account_id + g_values = [] # G列: 回填状态 + + matched_count = 0 + multi_count = 0 + + for phone in PHONES: + masked = mask_phone(phone) + acc_id = MATCHES.get(masked) + if acc_id: + f_values.append([str(acc_id)]) + g_values.append(["已匹配"]) + matched_count += 1 + else: + f_values.append(["未匹配"]) + g_values.append(["未匹配"]) + + print(f"Total: {len(PHONES)}, Matched: {matched_count}, Unmatched: {len(PHONES) - matched_count}") + + # 获取 token + tat = get_tat() + token = 'RFIJsXT8FhGHhctY4RwczcOfnac' + sheet_id = '2DOxEI' + + # 更新 F 列 (F915:F992) - 回填用户ID + resp_f = update_sheet_range(tat, token, sheet_id, 'F915:F992', f_values) + print(f"Update F column: code={resp_f.get('code')}, msg={resp_f.get('msg')}") + + # 更新 G 列 (G915:G992) - 回填状态 + resp_g = update_sheet_range(tat, token, sheet_id, 'G915:G992', g_values) + print(f"Update G column: code={resp_g.get('code')}, msg={resp_g.get('msg')}") + + # 更新 E 列 (E915:E992) - 请求状态改为"已返回" + e_values = [["已返回"] for _ in range(len(PHONES))] + resp_e = update_sheet_range(tat, token, sheet_id, 'E915:E992', e_values) + print(f"Update E column: code={resp_e.get('code')}, msg={resp_e.get('msg')}") + + if resp_f.get('code') == 0 and resp_g.get('code') == 0: + print("\n✅ 所有78条记录已成功回填!") + else: + print("\n⚠️ 部分更新可能失败,请检查。") diff --git a/scripts/may230_refresh.py b/scripts/may230_refresh.py new file mode 100644 index 0000000..527fb7e --- /dev/null +++ b/scripts/may230_refresh.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +"""处理Sheet1 5月230条待查询:查学情+回填D/C列""" +import json, subprocess, os, urllib.request, re +from datetime import datetime + +# ===== 0. 准备 ===== +secrets = {} +with open('/root/.openclaw/workspace/secrets.env') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#') and '=' in line: + k, v = line.split('=', 1) + secrets[k] = v.strip('"').strip("'") +PG_PASS = secrets['PG_ONLINE_PASSWORD'] + +def pg_query(sql): + r = subprocess.run(['psql', '-h', 'bj-postgres-16pob4sg.sql.tencentcdb.com', '-p', '28591', + '-U', 'ai_member', '-d', 'vala_bi', '-t', '-A', '-F', '\t'], + input=sql, capture_output=True, text=True, env={**os.environ, 'PGPASSWORD': PG_PASS}) + return r.stdout.strip() + +config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json')) +app = config['apps'][0] +data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode() +req = urllib.request.Request('https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', + data=data, headers={'Content-Type': 'application/json; charset=utf-8'}) +TAT = json.loads(urllib.request.urlopen(req).read())['tenant_access_token'] + +TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac' +SHEET = '55b0eb' + +# ===== 1. 读取目标数据 ===== +targets = json.load(open('/tmp/may230_targets.json')) +with_uid = targets['with_uid'] # [{row, uid, sales, phone}] +without_uid = targets['without_uid'] # [{row, uid:'', sales, phone}] +all_rows = with_uid + without_uid +print(f"[1] 目标: {len(all_rows)} 条 (有uid={len(with_uid)}, 无uid={len(without_uid)})") + +# ===== 2. 手机号匹配 account_id(无uid的行)===== +print("[2] 手机号匹配 account_id...") +phone_to_acc = {} # phone -> account_id +if without_uid: + phones = sorted(set(r['phone'] for r in without_uid if r['phone'])) + # 构建脱敏条件 + conds = [] + for p in phones: + if len(p) == 11 and p.isdigit(): + conds.append(f"(LEFT(tel,3)='{p[:3]}' AND RIGHT(tel,4)='{p[-4:]}')") + + # DB中tel是脱敏格式(130****1168),需用明文手机号做key + phone_to_acc_raw = {} # masked -> acc_id + batch_size = 200 + for i in range(0, len(conds), batch_size): + batch_conds = conds[i:i+batch_size] + sql = f""" + SELECT id, tel FROM ( + SELECT id, tel, ROW_NUMBER() OVER (PARTITION BY tel ORDER BY id DESC) AS rn + FROM bi_vala_app_account + WHERE status = 1 AND deleted_at IS NULL + AND ({' OR '.join(batch_conds)}) + ) t WHERE rn = 1 + """ + for line in pg_query(sql).split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 2: + phone_to_acc_raw[parts[1]] = parts[0] + + # 用明文手机号做key映射 + for p in phones: + masked = f"{p[:3]}****{p[-4:]}" + acc = phone_to_acc_raw.get(masked) + if acc: + phone_to_acc[p] = acc + + print(f" 匹配: {len(phone_to_acc)}/{len(phones)}") + +# 构建 row_num -> {acc_id, sales} 映射 +row_info = {} +for r in with_uid: + row_info[r['row']] = {'acc_id': r['uid'], 'sales': r['sales']} +for r in without_uid: + acc = phone_to_acc.get(r['phone'], '') + row_info[r['row']] = {'acc_id': acc, 'sales': r['sales']} + +matched = sum(1 for v in row_info.values() if v['acc_id']) +print(f" 最终有account_id: {matched}/{len(row_info)}") + +# ===== 3. 批量数据库查询学情 ===== +print("[3] 批量查询学情...") + +acc_ids = sorted(set(v['acc_id'] for v in row_info.values() if v['acc_id']), key=int) +uid_csv = ','.join(acc_ids) +print(f" 去重用户: {len(acc_ids)}") + +# 3a. 课程+角色 +print(" 课程信息...") +course_rows = pg_query(f""" +SELECT a.id, c.id, COALESCE(d.course_level,'?'), + CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END +FROM bi_vala_app_account a +JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL +LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL +WHERE a.id IN ({uid_csv}) AND a.status = 1 AND a.deleted_at IS NULL +ORDER BY a.id, c.id, d.course_level +""") + +user_courses = {} +all_uids = set() +for line in course_rows.split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 4: + acc_id, uid, level, ctype = parts[0], parts[1], parts[2], parts[3] + all_uids.add(uid) + user_courses.setdefault(acc_id, []).append({'user_id': uid, 'level': level, 'type': ctype}) + +# 3b. 最近行课 +print(" 最近行课...") +play_records = {} +for i in range(8): + subset = [u for u in all_uids if int(u) % 8 == i] + if not subset: continue + for line in pg_query(f""" + SELECT DISTINCT ON (user_id) user_id, created_at::date, chapter_id, chapter_unique_id + FROM bi_user_chapter_play_record_{i} + WHERE user_id IN ({','.join(subset)}) AND play_status = 1 + ORDER BY user_id, created_at DESC + """).split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 4: + play_records[parts[0]] = {'date': parts[1], 'ch_id': parts[2], 'ch_uid': parts[3]} + +# 3c. 课程结构 +print(" 课程结构...") +ch_ids = set(r['ch_id'] for r in play_records.values()) +ch_map = {} +if ch_ids: + for line in pg_query(f"SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson WHERE id IN ({','.join(ch_ids)})").split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 5: + ch_map[parts[0]] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}" + +# 3d. 学习时长 +print(" 学习时长...") +study_map = {} +for i in range(8): + subset = [u for u in all_uids if int(u) % 8 == i] + if not subset: continue + for line in pg_query(f"SELECT user_id, COALESCE(SUM(interval_time),0)/60000.0 FROM bi_user_component_play_record_{i} WHERE user_id IN ({','.join(subset)}) GROUP BY user_id").split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 2: + study_map[parts[0]] = float(parts[1]) + +# 3e. 付费状态 +print(" 付费状态...") +paid_set = set() +for line in pg_query(f"SELECT DISTINCT account_id FROM bi_vala_order WHERE account_id IN ({uid_csv}) AND pay_success_date IS NOT NULL AND order_status = 3").split('\n'): + if line.strip(): + paid_set.add(line.strip()) + +print(f" 课程:{len(user_courses)} 行课:{len(play_records)} 付费:{len(paid_set)}") + +# ===== 4. 组装 D 列文本 ===== +print("[4] 组装学情文本...") + +def get_best_char(acc_id): + chars = user_courses.get(acc_id, []) + if not chars: + return None, None, None + best, best_date = None, None + for c in chars: + pr = play_records.get(c['user_id']) + if pr and (best_date is None or pr['date'] > best_date): + best_date = pr['date'] + best = c + if best is None: + best = chars[0] + uid = best['user_id'] + pr = play_records.get(uid) + level = best['level'].replace('A1','L1').replace('A2','L2') if best['level'] != '?' else '?' + ctype = best['type'] + if pr and pr['ch_id'] in ch_map: + ch_name = ch_map[pr['ch_id']] + # format: "L1体验课-U00-L01" + parts = ch_name.split('-') + if len(parts) >= 5: + current = f"{level}{ctype}-{parts[3]}-{parts[4]}" + else: + current = f"{level}{ctype}-{ch_name}" + elif pr: + current = f"{level}{ctype}-?" + else: + current = f"{level}{ctype}-无记录" + recent = pr['date'] if pr else '无记录' + study = int(study_map.get(uid, 0)) + return current, recent, study + +updates = {} +no_match = [] + +for row_num, info in row_info.items(): + acc_id = info['acc_id'] + sales = info['sales'] + + if not acc_id: + no_match.append(row_num) + d_text = f"销售:{sales} | 用户:未匹配 | 当前:无记录 | 最近行课:无记录 | 学习0min | 未付费" + else: + current, recent, study = get_best_char(acc_id) + pay = '已付费' if acc_id in paid_set else '未付费' + if current: + d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent} | 学习{study}min | {pay}" + else: + d_text = f"销售:{sales} | 用户:{acc_id} | 无课程角色 | 最近行课:无记录 | 学习0min | {pay}" + + updates[row_num] = d_text + +print(f" 未匹配手机号: {len(no_match)} 行") + +# ===== 5. 批量回填 ===== +print(f"[5] 批量回填 {len(updates)} 条...") + +def api_put(tat, token, sheet, range_str, values): + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values" + body = {"valueRange": {"range": f"{sheet}!{range_str}", "values": values}} + data = json.dumps(body).encode() + req = urllib.request.Request(url, data=data, method='PUT') + req.add_header('Authorization', f'Bearer {tat}') + req.add_header('Content-Type', 'application/json; charset=utf-8') + try: + resp = json.loads(urllib.request.urlopen(req).read()) + return resp.get('code') == 0 + except Exception as e: + print(f" API error: {e}") + return False + +sorted_items = sorted(updates.items(), key=lambda x: x[0]) + +# 找连续块 +blocks = [] +cur_block = [sorted_items[0]] +for i in range(1, len(sorted_items)): + if sorted_items[i][0] == sorted_items[i-1][0] + 1: + cur_block.append(sorted_items[i]) + else: + blocks.append(cur_block) + cur_block = [sorted_items[i]] +if cur_block: + blocks.append(cur_block) + +print(f" {len(blocks)} 个连续块") + +success_d = 0 +success_c = 0 +for bi, block in enumerate(blocks): + rows = [r for r, _ in block] + d_vals = [[v] for _, v in block] + c_vals = [["已返回"] for _ in block] + start_row, end_row = rows[0], rows[-1] + + if api_put(TAT, TOKEN, SHEET, f'D{start_row}:D{end_row}', d_vals): + success_d += len(block) + if api_put(TAT, TOKEN, SHEET, f'C{start_row}:C{end_row}', c_vals): + success_c += len(block) + + if (bi+1) % 20 == 0: + print(f" 块 {bi+1}/{len(blocks)}: D={success_d} C={success_c}") + +print(f"\n✅ 完成: D列={success_d}/230, C列={success_c}/230") +if no_match: + print(f"⚠️ 未匹配手机号 {len(no_match)} 行: {no_match[:20]}") diff --git a/scripts/refresh_may_course_data.py b/scripts/refresh_may_course_data.py new file mode 100644 index 0000000..7db06f8 --- /dev/null +++ b/scripts/refresh_may_course_data.py @@ -0,0 +1,397 @@ +#!/usr/bin/env python3 +""" +刷新 5 月行课记录:查询学情数据并回填 Sheet1 D 列 +""" +import json +import subprocess +import sys +import os +from datetime import datetime + +# ========== 1. 读取目标行 ========== +target_rows = json.load(open('/tmp/target_rows.json')) +print(f"[1/4] 读取目标行: {len(target_rows)} 条") + +# 提取 account_id 列表 +uid_set = sorted(set(str(r[1]) for r in target_rows if r[1]), key=int) +print(f" 去重用户数: {len(uid_set)}") +uid_csv = ','.join(uid_set) + +# ========== 2. 数据库查询 ========== +print(f"[2/4] 查询学情数据...") + +# 从 secrets.env 获取密码 +secrets = {} +with open('/root/.openclaw/workspace/secrets.env') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#') and '=' in line: + k, v = line.split('=', 1) + secrets[k] = v.strip('"').strip("'") + +pg_pass = secrets.get('PG_ONLINE_PASSWORD', '') +pg_host = 'bj-postgres-16pob4sg.sql.tencentcdb.com' +pg_port = '28591' + +# 2a. 课程信息 + 角色ID +print(" 查询课程信息...") +course_sql = f""" +SELECT + a.id AS account_id, + c.id AS user_id, + d.course_level, + CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END AS course_type +FROM bi_vala_app_account a +JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL +LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL +WHERE a.id IN ({uid_csv}) + AND a.status = 1 + AND a.deleted_at IS NULL +ORDER BY a.id, c.id, d.course_level +""" + +result = subprocess.run( + ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', + '-t', '-A', '-F', '\t'], + input=course_sql, + capture_output=True, text=True, + env={**os.environ, 'PGPASSWORD': pg_pass} +) + +# 构建 account_id -> {user_id列表, 课程信息} 映射 +user_courses = {} # account_id -> [{'user_id': ..., 'level': ..., 'type': ...}, ...] +for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split('\t') + if len(parts) >= 4: + acc_id, user_id, level, ctype = parts[0], parts[1], parts[2], parts[3] + if acc_id not in user_courses: + user_courses[acc_id] = [] + user_courses[acc_id].append({ + 'user_id': user_id, + 'level': level, + 'type': ctype + }) + +print(f" 课程信息: {len(user_courses)} 个账号有角色") + +# 2b. 最近行课记录 (分表查询) +print(" 查询最近行课记录...") +# 收集所有 user_id +all_user_ids = set() +for acc_id, chars in user_courses.items(): + for c in chars: + all_user_ids.add(c['user_id']) + +# 分表查询 +play_records = {} # user_id -> {'recent_date': ..., 'chapter_id': ..., 'chapter_unique_id': ...} +tables = [f'bi_user_chapter_play_record_{i}' for i in range(8)] +for table in tables: + uid_subset = [u for u in all_user_ids if int(u) % 8 == int(table[-1])] + if not uid_subset: + continue + uid_csv_sub = ','.join(uid_subset) + sql = f""" + SELECT DISTINCT ON (user_id) + user_id, + created_at::date AS recent_date, + chapter_id, + chapter_unique_id + FROM {table} + WHERE user_id IN ({uid_csv_sub}) + AND play_status = 1 + ORDER BY user_id, created_at DESC + """ + result = subprocess.run( + ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', + '-t', '-A', '-F', '\t'], + input=sql, + capture_output=True, text=True, + env={**os.environ, 'PGPASSWORD': pg_pass} + ) + for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split('\t') + if len(parts) >= 4: + play_records[parts[0]] = { + 'recent_date': parts[1], + 'chapter_id': parts[2], + 'chapter_unique_id': parts[3] + } + +print(f" 行课记录: {len(play_records)} 个角色有行课") + +# 2c. 课程结构映射 (chapter_id -> 课程名称) +print(" 查询课程结构...") +chapter_ids = set(r['chapter_id'] for r in play_records.values()) +chapter_map = {} +if chapter_ids: + ch_csv = ','.join(chapter_ids) + ch_sql = f""" + SELECT id, course_level, course_season, course_unit, course_lesson + FROM bi_level_unit_lesson + WHERE id IN ({ch_csv}) + """ + result = subprocess.run( + ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', + '-t', '-A', '-F', '\t'], + input=ch_sql, + capture_output=True, text=True, + env={**os.environ, 'PGPASSWORD': pg_pass} + ) + for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split('\t') + if len(parts) >= 5: + ch_id = parts[0] + chapter_map[ch_id] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}" + +# 2d. 学习时长 +print(" 查询学习时长...") +study_times = {} # user_id -> total_minutes +comp_tables = [f'bi_user_component_play_record_{i}' for i in range(8)] +for table in comp_tables: + uid_subset = [u for u in all_user_ids if int(u) % 8 == int(table[-1])] + if not uid_subset: + continue + uid_csv_sub = ','.join(uid_subset) + sql = f""" + SELECT user_id, COALESCE(SUM(interval_time), 0) / 60000.0 AS total_min + FROM {table} + WHERE user_id IN ({uid_csv_sub}) + GROUP BY user_id + """ + result = subprocess.run( + ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', + '-t', '-A', '-F', '\t'], + input=sql, + capture_output=True, text=True, + env={**os.environ, 'PGPASSWORD': pg_pass} + ) + for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split('\t') + if len(parts) >= 2: + study_times[parts[0]] = float(parts[1]) + +# 2e. 付费状态 +print(" 查询付费状态...") +pay_sql = f""" +SELECT account_id, + CASE WHEN COUNT(*) > 0 THEN '已付费' ELSE '未付费' END AS pay_status +FROM bi_vala_order +WHERE account_id IN ({uid_csv}) + AND pay_success_date IS NOT NULL + AND order_status = 3 +GROUP BY account_id +""" +result = subprocess.run( + ['psql', '-h', pg_host, '-p', pg_port, '-U', 'ai_member', '-d', 'vala_bi', + '-t', '-A', '-F', '\t'], + input=pay_sql, + capture_output=True, text=True, + env={**os.environ, 'PGPASSWORD': pg_pass} +) +paid_users = set() +for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split('\t') + if len(parts) >= 2 and parts[1] == '已付费': + paid_users.add(parts[0]) + +# 查询所有用户(包括未付费的) +all_paid = set() +for acc_id in uid_set: + if acc_id in paid_users: + all_paid.add(acc_id) +print(f" 付费用户: {len(paid_users)}") + +# ========== 3. 组装学情数据 ========== +print(f"[3/4] 组装学情数据...") + +def get_course_info(acc_id): + """获取用户最优先的课程信息""" + chars = user_courses.get(acc_id, []) + if not chars: + return None, None, None + + # 选最近有过行课记录的角色 + best_char = None + best_date = None + for c in chars: + uid = c['user_id'] + if uid in play_records: + d = play_records[uid]['recent_date'] + if best_date is None or d > best_date: + best_date = d + best_char = c + + if best_char is None and chars: + best_char = chars[0] # 回退到第一个角色 + + if best_char is None: + return None, None, None + + uid = best_char['user_id'] + level = best_char['level'] or '?' + ctype = best_char['type'] or '体验课' + + # 获取当前进度 + pr = play_records.get(uid) + if pr and pr['chapter_id'] in chapter_map: + chapter_name = chapter_map[pr['chapter_id']] + current = f"{level}{ctype}-{chapter_name}" + else: + current = f"{level}{ctype}-?" + + recent_date = pr['recent_date'] if pr else '无记录' + study_min = study_times.get(uid, 0) + + return current, recent_date, int(study_min) + +# 为每个目标行生成 D 列文本 +now_str = datetime.now().strftime('%Y-%m-%d %H:%M') +updates = [] # [(row_num, d_text)] + +for row_num, acc_id, phone in target_rows: + acc_id = str(acc_id) + current, recent_date, study_min = get_course_info(acc_id) + pay_status = '已付费' if acc_id in paid_users else '未付费' + + # 从原始表取销售名(通过B列用户ID) + # 销售名需要从原始表A列获取,但这里我们直接用target_rows中没有销售名 + # 实际上我们需要重新读取A列。为了效率,暂时用"用户"占位 + # 让我从之前的扫描中保留销售名... + + # 实际上 target_rows 目前只有 (row_num, acc_id, phone),没有 sales_name + # 需要补读销售名列 + # 先临时处理,后续补全 + + if current is None: + d_text = f"用户:{acc_id} | 未匹配到课程信息 | 最近行课:无记录 | 学习0min | {pay_status}" + else: + d_text = f"用户:{acc_id} | 当前:{current} | 最近行课:{recent_date} | 学习{study_min}min | {pay_status}" + + updates.append((row_num, d_text, pay_status, recent_date, current)) + +print(f" 生成 {len(updates)} 条更新") + +# ========== 需要补读A列(销售名)========== +# 从 target_rows 中批量读取A列 +print(" 补读销售名列...") +import urllib.request + +# 获取 TAT +config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json')) +app = config['apps'][0] +tat_data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode() +tat_req = urllib.request.Request( + 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', + data=tat_data, headers={'Content-Type': 'application/json; charset=utf-8'}) +tat = json.loads(urllib.request.urlopen(tat_req).read())['tenant_access_token'] + +# 读取所有目标行的 A 列 +TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac' +SHEET = '55b0eb' + +# 按行号排序 +target_rows_sorted = sorted(updates, key=lambda x: x[0]) +sales_map = {} # row_num -> sales_name + +# 分批读取A列 +batch_size = 200 +for i in range(0, len(target_rows_sorted), batch_size): + batch = target_rows_sorted[i:i+batch_size] + row_nums = [str(r[0]) for r in batch] + range_str = f"{SHEET}!A{min(row_nums, key=int)}:A{max(row_nums, key=int)}" + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TOKEN}/values/{range_str}" + req = urllib.request.Request(url) + req.add_header('Authorization', f'Bearer {tat}') + + try: + resp = json.loads(urllib.request.urlopen(req).read()) + values = resp.get('data', {}).get('valueRange', {}).get('values', []) + start_row = int(min(row_nums, key=int)) + for j, v in enumerate(values): + row_num = start_row + j + if v: + sales_map[row_num] = v[0] + except Exception as e: + print(f" Error reading A col: {e}") + +print(f" 销售名: {len(sales_map)} 条") + +# 重新组装 D 列文本(加入销售名) +final_updates = [] +for row_num, d_text, pay_status, recent_date, current in target_rows_sorted: + sales = sales_map.get(row_num, '?') + + # 从原始 target_rows 中找到这个 row_num 对应的 acc_id + orig = next((r for r in target_rows if r[0] == row_num), None) + acc_id = str(orig[1]) if orig else '?' + + if current and current != 'None': + d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent_date} | 学习{study_min}min | {pay_status}" + else: + # 需要重新计算 + current2, recent_date2, study_min2 = get_course_info(acc_id) + pay_status2 = '已付费' if acc_id in paid_users else '未付费' + if current2: + d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current2} | 最近行课:{recent_date2} | 学习{study_min2}min | {pay_status2}" + else: + d_text = f"销售:{sales} | 用户:{acc_id} | 未匹配到课程信息 | 最近行课:无记录 | 学习0min | {pay_status2}" + + final_updates.append((row_num, d_text)) + +updates = final_updates + +# ========== 4. 批量回填 ========== +print(f"[4/4] 批量回填 {len(updates)} 条...") + +# 构建批量更新请求 +# 飞书 API 支持范围更新,但 D 列不连续(有大量跳行) +# 策略:逐行更新,每50行一批 + +def update_cell(tat, token, sheet, row, col, value): + """更新单个单元格""" + range_str = f"{sheet}!{col}{row}:{col}{row}" + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values" + body = json.dumps({ + "valueRange": { + "range": range_str, + "values": [[value]] + } + }).encode() + req = urllib.request.Request(url, data=body, method='PUT') + req.add_header('Authorization', f'Bearer {tat}') + req.add_header('Content-Type', 'application/json; charset=utf-8') + resp = json.loads(urllib.request.urlopen(req).read()) + return resp.get('code') == 0 + +# 分批处理:每50个一批,D列和C列一起更新 +batch_size = 50 +success_d = 0 +success_c = 0 + +for i in range(0, len(updates), batch_size): + batch = updates[i:i+batch_size] + + # 收集 D 列和 C 列的更新 + # 由于行不连续,需要逐行更新 + for row_num, d_text in batch: + # 更新 D 列 + if update_cell(tat, TOKEN, SHEET, row_num, 'D', d_text): + success_d += 1 + + # 更新 C 列为"已返回" + if update_cell(tat, TOKEN, SHEET, row_num, 'C', '已返回'): + success_c += 1 + + print(f" 进度: {min(i+batch_size, len(updates))}/{len(updates)} (D:{success_d}, C:{success_c})") + +print(f"\n✅ 完成!D列更新: {success_d}/{len(updates)},C列更新: {success_c}/{len(updates)}") diff --git a/scripts/refresh_may_v2.py b/scripts/refresh_may_v2.py new file mode 100644 index 0000000..9661e2c --- /dev/null +++ b/scripts/refresh_may_v2.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +""" +刷新 5 月行课记录 v2:高效批量查询 + 回填 +""" +import json, subprocess, os, urllib.request +from datetime import datetime + +# ===== 0. 准备 ===== +secrets = {} +with open('/root/.openclaw/workspace/secrets.env') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#') and '=' in line: + k, v = line.split('=', 1) + secrets[k] = v.strip('"').strip("'") +PG_PASS = secrets['PG_ONLINE_PASSWORD'] +PG_HOST = 'bj-postgres-16pob4sg.sql.tencentcdb.com' +PG_PORT = '28591' + +def pg_query(sql): + r = subprocess.run(['psql', '-h', PG_HOST, '-p', PG_PORT, '-U', 'ai_member', + '-d', 'vala_bi', '-t', '-A', '-F', '\t'], input=sql, capture_output=True, text=True, + env={**os.environ, 'PGPASSWORD': PG_PASS}) + return r.stdout.strip() + +def get_tat(): + config = json.load(open('/root/.openclaw/credentials/xiaoxi/config.json')) + app = config['apps'][0] + data = json.dumps({"app_id": app['appId'], "app_secret": app['appSecret']}).encode() + req = urllib.request.Request( + 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal', + data=data, headers={'Content-Type': 'application/json; charset=utf-8'}) + return json.loads(urllib.request.urlopen(req).read())['tenant_access_token'] + +TAT = get_tat() +TOKEN = 'RFIJsXT8FhGHhctY4RwczcOfnac' +SHEET = '55b0eb' + +# ===== 1. 读取目标行 + 销售名 ===== +print("[1/4] 读取目标行数据...") +target_rows = json.load(open('/tmp/target_rows.json')) +uid_set = sorted(set(str(r[1]) for r in target_rows if r[1]), key=int) +uid_csv = ','.join(uid_set) +print(f" 644 条记录, {len(uid_set)} 去重用户") + +# 批量读取 A 列(销售名)—— 一次 API 调用读整个范围 +min_row = min(r[0] for r in target_rows) +max_row = max(r[0] for r in target_rows) +url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TOKEN}/values/{SHEET}!A{min_row}:A{max_row}" +req = urllib.request.Request(url) +req.add_header('Authorization', f'Bearer {TAT}') +resp = json.loads(urllib.request.urlopen(req).read()) +a_values = resp.get('data', {}).get('valueRange', {}).get('values', []) +sales_map = {} # row_num -> sales_name +for i, v in enumerate(a_values): + if v: + sales_map[min_row + i] = v[0] + +# 构建 row_num -> (account_id, sales_name) +row_data = {} +for row_num, acc_id, phone in target_rows: + row_data[row_num] = { + 'acc_id': str(acc_id), + 'sales': sales_map.get(row_num, '?') + } + +# ===== 2. 数据库批量查询 ===== +print("[2/4] 批量查询学情...") + +# 2a. 课程 + 角色 +print(" 课程信息...") +course_rows = pg_query(f""" +SELECT a.id AS account_id, c.id AS user_id, d.course_level, + CASE WHEN d.expire_time IS NOT NULL THEN '正式课' ELSE '体验课' END AS course_type +FROM bi_vala_app_account a +JOIN bi_vala_app_character c ON c.account_id = a.id AND c.deleted_at IS NULL +LEFT JOIN bi_user_course_detail d ON d.user_id = c.id AND d.deleted_at IS NULL +WHERE a.id IN ({uid_csv}) AND a.status = 1 AND a.deleted_at IS NULL +ORDER BY a.id, c.id, d.course_level +""") + +user_courses = {} # acc_id -> [{'user_id':..., 'level':..., 'type':...}] +all_user_ids = set() +for line in course_rows.split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 4: + acc_id, uid, level, ctype = parts[0], parts[1], parts[2] or '?', parts[3] or '体验课' + all_user_ids.add(uid) + user_courses.setdefault(acc_id, []).append({'user_id': uid, 'level': level, 'type': ctype}) + +# 2b. 最近行课 (分表) +print(" 最近行课...") +play_records = {} +for i in range(8): + subset = [u for u in all_user_ids if int(u) % 8 == i] + if not subset: continue + sql = f""" + SELECT DISTINCT ON (user_id) user_id, created_at::date AS rd, chapter_id, chapter_unique_id + FROM bi_user_chapter_play_record_{i} + WHERE user_id IN ({','.join(subset)}) AND play_status = 1 + ORDER BY user_id, created_at DESC + """ + for line in pg_query(sql).split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 4: + play_records[parts[0]] = {'date': parts[1], 'ch_id': parts[2], 'ch_uid': parts[3]} + +# 2c. 课程结构 +print(" 课程结构...") +ch_ids = set(r['ch_id'] for r in play_records.values()) +ch_map = {} +if ch_ids: + for line in pg_query(f"SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson WHERE id IN ({','.join(ch_ids)})").split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 5: + ch_map[parts[0]] = f"{parts[1]}-{parts[2]}-{parts[3]}-{parts[4]}" + +# 2d. 学习时长 +print(" 学习时长...") +study_map = {} +for i in range(8): + subset = [u for u in all_user_ids if int(u) % 8 == i] + if not subset: continue + for line in pg_query(f"SELECT user_id, COALESCE(SUM(interval_time),0)/60000.0 FROM bi_user_component_play_record_{i} WHERE user_id IN ({','.join(subset)}) GROUP BY user_id").split('\n'): + if not line: continue + parts = line.split('\t') + if len(parts) >= 2: + study_map[parts[0]] = float(parts[1]) + +# 2e. 付费状态 +print(" 付费状态...") +paid_set = set() +for line in pg_query(f"SELECT DISTINCT account_id FROM bi_vala_order WHERE account_id IN ({uid_csv}) AND pay_success_date IS NOT NULL AND order_status = 3").split('\n'): + if line.strip(): + paid_set.add(line.strip()) + +print(f" 课程:{len(user_courses)} 行课:{len(play_records)} 付费:{len(paid_set)}") + +# ===== 3. 组装 D 列文本 ===== +print("[3/4] 组装学情文本...") + +def get_best_char(acc_id): + chars = user_courses.get(acc_id, []) + if not chars: + return None, None, None + best, best_date = None, None + for c in chars: + pr = play_records.get(c['user_id']) + if pr and (best_date is None or pr['date'] > best_date): + best_date = pr['date'] + best = c + if best is None: + best = chars[0] + uid = best['user_id'] + pr = play_records.get(uid) + level = best['level'] if best['level'] != '?' else '?' + ctype = best['type'] + if pr and pr['ch_id'] in ch_map: + current = f"{level}{ctype}-{ch_map[pr['ch_id']]}" + elif pr: + current = f"{level}{ctype}-?" + else: + current = f"{level}{ctype}-无记录" + recent = pr['date'] if pr else '无记录' + study = int(study_map.get(uid, 0)) + return current, recent, study + +now_str = datetime.now().strftime('%Y-%m-%d %H:%M') +updates = {} + +for row_num, info in row_data.items(): + acc_id = info['acc_id'] + sales = info['sales'] + current, recent, study = get_best_char(acc_id) + pay = '已付费' if acc_id in paid_set else '未付费' + + if current: + d_text = f"销售:{sales} | 用户:{acc_id} | 当前:{current} | 最近行课:{recent} | 学习{study}min | {pay}" + else: + d_text = f"销售:{sales} | 用户:{acc_id} | 无课程角色 | 最近行课:无记录 | 学习0min | {pay}" + + updates[row_num] = d_text + +# ===== 4. 批量回填 ===== +print(f"[4/4] 批量回填 {len(updates)} 条...") + +def api_put(tat, token, sheet, range_str, values): + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{token}/values" + body = {"valueRange": {"range": f"{sheet}!{range_str}", "values": values}} + data = json.dumps(body).encode() + req = urllib.request.Request(url, data=data, method='PUT') + req.add_header('Authorization', f'Bearer {tat}') + req.add_header('Content-Type', 'application/json; charset=utf-8') + try: + resp = json.loads(urllib.request.urlopen(req).read()) + return resp.get('code') == 0 + except Exception as e: + print(f" API error: {e}") + return False + +# 按行号排序后分批(每批50行,连续行合并为范围) +sorted_items = sorted(updates.items(), key=lambda x: x[0]) +success_d = 0 +success_c = 0 + +# 找连续行块 +blocks = [] +cur_block = [sorted_items[0]] +for i in range(1, len(sorted_items)): + prev_row = sorted_items[i-1][0] + cur_row = sorted_items[i][0] + if cur_row == prev_row + 1: + cur_block.append(sorted_items[i]) + else: + blocks.append(cur_block) + cur_block = [sorted_items[i]] +if cur_block: + blocks.append(cur_block) + +print(f" 共 {len(blocks)} 个连续块,批量更新中...") + +for bi, block in enumerate(blocks): + rows = [r for r, _ in block] + d_vals = [[v] for _, v in block] + c_vals = [["已返回"] for _ in block] + + start_row, end_row = rows[0], rows[-1] + + # 批量更新 D 列 + if api_put(TAT, TOKEN, SHEET, f'D{start_row}:D{end_row}', d_vals): + success_d += len(block) + + # 批量更新 C 列 + if api_put(TAT, TOKEN, SHEET, f'C{start_row}:C{end_row}', c_vals): + success_c += len(block) + + if (bi + 1) % 10 == 0: + print(f" 块 {bi+1}/{len(blocks)}: D={success_d} C={success_c}") + +print(f"\n✅ 完成!D列: {success_d}/{len(updates)},C列: {success_c}/{len(updates)}")