#!/usr/bin/env python3 """吴迪销转表行课更新 — 手机号补ID + 行课回填""" import json, requests, os, sys, psycopg2 from datetime import datetime from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" SHEET_ID = "f975f0" CRED_DIR = "/root/.openclaw/credentials/xiaoxi" def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取失败: {data}") return data["data"]["valueRange"]["values"] def put_values(token, sheet_id, range_str, values): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) return resp.json() def get_pg(): secrets_path = os.path.join(SCRIPTS_DIR, "..", "secrets.env") with open(secrets_path) as f: pg_pass = None for line in f: if line.startswith("PG_ONLINE_PASSWORD="): pg_pass = line.strip().split("=", 1)[1].strip("'\"") return psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=pg_pass, dbname="vala_bi" ) def clean_phone(val): if not val: return None val = str(val).strip().replace(" ", "").replace("-", "").replace("'", "") if "e" in val.lower(): try: val = str(int(float(val))) except: return None digits = "".join(c for c in val if c.isdigit()) return digits if len(digits) == 11 and digits.startswith("1") else None def main(): token = get_fs_token() rows = read_sheet(token, SHEET_ID) print(f"总行数: {len(rows)}") # 分析缺口 need_phone_to_id = [] # E有手机 H空 need_course = [] # H有UID D空 for idx, row in enumerate(rows[2:], start=3): # 第3行起 e_val = str(row[4]).strip() if len(row) > 4 and row[4] else "" h_val = str(row[7]).strip() if len(row) > 7 and row[7] else "" d_val = str(row[3]).strip() if len(row) > 3 and row[3] else "" phone = clean_phone(e_val) uid = h_val if h_val and h_val != "None" and h_val != "" else None if phone and not uid: need_phone_to_id.append((idx, phone)) if uid and not d_val: need_course.append((idx, uid)) print(f"手机号→ID待补: {len(need_phone_to_id)}") print(f"有UID但D空: {len(need_course)}") conn = get_pg() cur = conn.cursor() # ── Step 1: 手机号→ID ── if need_phone_to_id: # 批量加密 enc_map = {} for idx, phone in need_phone_to_id: enc = encrypt_phone(phone) enc_map[enc] = phone enc_list = list(enc_map.keys()) enc_to_aid = {} for i in range(0, len(enc_list), 500): batch = enc_list[i:i+500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL", batch ) for aid, tel_enc in cur.fetchall(): enc_to_aid[tel_enc] = aid # 回填H列 updates = [] for idx, phone in need_phone_to_id: enc = encrypt_phone(phone) aid = enc_to_aid.get(enc) updates.append((idx, str(aid) if aid else "")) # 分批写入 for i in range(0, len(updates), 50): batch = updates[i:i+50] vals = [[u[1]] for u in batch] sr, er = batch[0][0], batch[-1][0] put_values(token, SHEET_ID, f"H{sr}:H{er}", vals) matched = sum(1 for u in updates if u[1]) print(f"H列回填完成: {len(updates)}行, 匹配{matched}") # ── Step 2: 行课回填 ── if need_course: uids = list(set(uid for _, uid in need_course)) # 查注册日期和下载渠道 cur.execute(f""" SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id::text IN ({','.join(['%s']*len(uids))}) AND status=1 AND deleted_at IS NULL """, uids) acc_info = {str(row[0]): (row[1], row[2]) for row in cur.fetchall()} # 查角色 cur.execute(f""" SELECT account_id, id FROM bi_vala_app_character WHERE account_id::text IN ({','.join(['%s']*len(uids))}) """, uids) aid_to_chars = defaultdict(list) for aid, cid in cur.fetchall(): aid_to_chars[str(aid)].append(cid) # 查体验课节数 (expire_time IS NULL) cur.execute(f""" SELECT account_id, course_level, COUNT(*) as cnt FROM bi_user_course_detail WHERE account_id::text IN ({','.join(['%s']*len(uids))}) AND expire_time IS NULL AND deleted_at IS NULL GROUP BY account_id, course_level """, uids) trial_info = defaultdict(dict) for aid, level, cnt in cur.fetchall(): trial_info[str(aid)][level] = cnt # 查完课记录 char_ids = [] for cids in aid_to_chars.values(): char_ids.extend(cids) char_completed = defaultdict(set) if char_ids: for tbl_idx in range(8): cur.execute(f""" SELECT user_id, chapter_id FROM bi_user_chapter_play_record_{tbl_idx} WHERE user_id IN ({','.join(['%s']*len(char_ids))}) AND play_status = 1 """, char_ids) for uid, ch_id in cur.fetchall(): char_completed[uid].add(ch_id) # 组装回填数据 updates_d = [] updates_i = [] updates_j = [] for idx, uid in need_course: info = acc_info.get(uid) reg_date = info[0].strftime("%Y-%m-%d") if info and info[0] else "" dl_channel = info[1] if info and info[1] else "" # 体验节数 trials = trial_info.get(uid, {}) trial_parts = [] for level in sorted(trials.keys()): trial_parts.append(f"{level}体验{trials[level]}节") trial_str = ", ".join(trial_parts) if trial_parts else "无体验课" # 完课数 chars = aid_to_chars.get(uid, []) completed = set() for cid in chars: completed |= char_completed.get(cid, set()) d_val = f"{trial_str} | 已完课{len(completed)}节" if completed else trial_str updates_d.append((idx, d_val)) updates_i.append((idx, reg_date)) updates_j.append((idx, dl_channel)) # 分批写入 for col_letter, updates in [("D", updates_d), ("I", updates_i), ("J", updates_j)]: for i in range(0, len(updates), 50): batch = updates[i:i+50] vals = [[u[1]] for u in batch] sr, er = batch[0][0], batch[-1][0] put_values(token, SHEET_ID, f"{col_letter}{sr}:{col_letter}{er}", vals) print(f"D/I/J列回填完成: {len(updates_d)}行") cur.close() conn.close() print("完成") if __name__ == "__main__": main()