#!/usr/bin/env python3 """ 行课查询自动回填 V2 — 适配销转客户主表(全量) 数据源: ERCFsFo4MhnF0ytGeCrc0Bb8n5f / 1RFMqc (全量) 输出: 小溪明细 sheet (1zB5Be),全量表通过 VLOOKUP 自动刷新 流程: 1. 读取全量表 → 有手机无UID的 → 匹配 account_id → 回填 X/Y/Z 列 2. 读取全量表 → 所有有UID的 → 查行课数据 → 写入小溪明细 """ import json, requests, os, re, psycopg2, sys, time from datetime import datetime from collections import defaultdict # ── 配置 ── PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = 28591 PG_USER = "ai_member" PG_DB = "vala_bi" SPREADSHEET_TOKEN = "ERCFsFo4MhnF0ytGeCrc0Bb8n5f" SHEET_ALL = "1RFMqc" # 全量表 SHEET_DETAIL = "1zB5Be" # 小溪明细 CRED_DIR = "/root/.openclaw/credentials/xiaoxi" LOG_FILE = "/var/log/xiaoxi_xingke_query_v2.log" def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_pg_password(): secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env") with open(secrets_path) as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id, range_str=None): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" if range_str: url += f"!{range_str}" for attempt in range(3): try: resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=90) resp.encoding = 'utf-8' data = json.loads(resp.text) if data.get("code") != 0: raise RuntimeError(f"读取Sheet失败: {data}") return data["data"]["valueRange"]["values"] except (json.JSONDecodeError, requests.exceptions.ConnectionError) as e: log(f" 读取重试 {attempt+1}/3: {e}") if attempt == 2: log(f" resp.text[:100]: {resp.text[:100] if 'resp' in dir() else 'N/A'}") time.sleep(2 ** attempt) raise RuntimeError("读取Sheet失败: 3次重试均失败") def put_values(token, sheet_id, range_str, values): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" 写入失败 {range_str}: {r}") return r def append_rows(token, sheet_id, values): """追加行到 sheet 末尾""" url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}:append" body = {"valueRange": {"range": f"{sheet_id}!A:AA", "values": values}} resp = requests.post(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" 追加失败: {r}") return r def mask_phone(phone): phone = str(phone).strip() if "." in phone: parts = phone.split(".") if parts[1] in ("0", "00"): phone = parts[0] if re.match(r"^1\d{10}$", phone): return f"{phone[:3]}****{phone[-4:]}" return None def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i+chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results # ── Phase 1: 手机号→ID 匹配 ── def phase1_phone_to_id(token, conn): """读取全量表,对有手机无UID的行做匹配,回填 X/Y/Z 列""" log("Phase 1: 手机号→ID 匹配") rows = read_sheet(token, SHEET_ALL) pending = [] for idx, row in enumerate(rows[3:], start=4): if len(row) < 5: continue # D列=手机号(索引3), I列=用户ID(索引8), X列=匹配uid(索引23) phone = str(row[3]).strip() if len(row) > 3 and row[3] else "" uid_i = str(row[8]).strip() if len(row) > 8 and row[8] else "" uid_x = str(row[23]).strip() if len(row) > 23 and row[23] else "" if not phone or uid_i or uid_x: continue if phone.startswith("="): continue pending.append({"row_idx": idx, "phone": phone}) log(f" 待匹配: {len(pending)}") if not pending: log(" 无需匹配, 跳过") return 0, 0 phones_raw = list(set(r["phone"] for r in pending)) valid_phones = [(p, mask_phone(p)) for p in phones_raw if mask_phone(p)] masks = list(set(m[1] for m in valid_phones)) cur = conn.cursor() masked_to_aid = {} for i in range(0, len(masks), 500): batch = masks[i:i+500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT id, tel FROM bi_vala_app_account WHERE tel IN ({ph}) AND status=1 AND deleted_at IS NULL", batch ) for aid, tel in cur.fetchall(): if tel not in masked_to_aid: masked_to_aid[tel] = aid phone_to_aid = {} for phone, m in valid_phones: if m in masked_to_aid: phone_to_aid[phone] = masked_to_aid[m] matched = len(phone_to_aid) log(f" 匹配成功: {matched}, 未匹配: {len(valid_phones)-matched}") for r in pending: phone = r["phone"] row = r["row_idx"] if phone in phone_to_aid: put_values(token, SHEET_ALL, f"X{row}:X{row}", [[str(phone_to_aid[phone])]]) log(f" 回填完成: {len(pending)} 行") cur.close() return len(pending), matched # ── Phase 2: 行课记录查询 & 写入小溪明细 ── def phase2_course_records(token, conn): """读取全量表所有UID → 查行课数据 → 写入小溪明细""" log("Phase 2: 行课记录查询") rows = read_sheet(token, SHEET_ALL) uid_rows = {} for idx, row in enumerate(rows[3:], start=4): uid = "" if len(row) > 8 and row[8]: try: uid = str(int(float(str(row[8]).strip()))) except (ValueError, TypeError): pass if not uid and len(row) > 23 and row[23]: try: uid = str(int(float(str(row[23]).strip()))) except (ValueError, TypeError): pass if not uid: continue sales = str(row[0]).strip() if len(row) > 0 and row[0] else "" jinxian = str(row[2]).strip() if len(row) > 2 and row[2] else "" phone = str(row[3]).strip() if len(row) > 3 and row[3] else "" aid = int(uid) if aid <= 0: continue if aid not in uid_rows: uid_rows[aid] = (idx, sales, jinxian, phone) uid_set = list(uid_rows.keys()) log(f" 有效UID: {len(uid_set)}") if not uid_set: log(" 无UID, 跳过") return 0 cur = conn.cursor() # 账户信息 log(" 查询账户信息...") aid_info = {} for i in range(0, len(uid_set), 500): batch = uid_set[i:i+500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT id, tel, created_at FROM bi_vala_app_account WHERE id IN ({ph}) AND status=1 AND deleted_at IS NULL", batch ) for aid, tel, created_at in cur.fetchall(): aid_info[aid] = {"tel": tel or "", "created_at": str(created_at) if created_at else ""} # 角色信息 log(" 查询角色信息...") account_chars = defaultdict(list) char_to_account = {} rc = batch_in(cur, "SELECT account_id, id, nickname FROM bi_vala_app_character WHERE account_id IN (%s) AND nickname IS NOT NULL AND nickname != '' AND deleted_at IS NULL", uid_set ) for aid, cid, nick in rc: account_chars[aid].append(cid) char_to_account[cid] = aid char_ids = list(char_to_account.keys()) log(f" 角色数: {len(char_ids)}") # 课程映射 cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson") chapter_map = {} for ch_id, cl, cs, cu, cl2 in cur.fetchall(): chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "") # 课时完成记录 log(" 查询课时完成记录...") char_chapter_times = defaultdict(dict) char_latest = {} for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)", (char_ids,) ) for uid, ch_id, created_at in cur.fetchall(): ch_data = chapter_map.get(ch_id) if not ch_data: continue if ch_id not in char_chapter_times[uid] or created_at < char_chapter_times[uid][ch_id]: char_chapter_times[uid][ch_id] = created_at prev = char_latest.get(uid) if prev is None or created_at > prev[0]: char_latest[uid] = (created_at, ch_id, ch_data) except Exception as e: log(f" 警告 {table}: {e}") # 学习耗时 log(" 查询学习耗时...") char_total_ms = defaultdict(int) for tbl_idx in range(8): table = f"bi_user_component_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id", (char_ids,) ) for uid, total_ms in cur.fetchall(): char_total_ms[uid] += (total_ms or 0) except Exception as e: log(f" 警告 {table}: {e}") # 激活状态 log(" 查询激活状态...") activation = {} for i in range(0, len(uid_set), 500): batch = uid_set[i:i+500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket WHERE account_id IN ({ph}) AND status=1 AND deleted_at IS NULL AND season_package_level IN ('A1','A2')", batch ) for aid, lvl in cur.fetchall(): if aid not in activation: activation[aid] = lvl # 付费信息 log(" 查询付费信息...") paid_info = {} for i in range(0, len(uid_set), 500): batch = uid_set[i:i+500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"""SELECT account_id, MIN(pay_success_date) as first_pay, SUM(pay_amount_int)/100.0 as total_gmv, SUM(CASE WHEN order_status=3 THEN pay_amount_int ELSE 0 END)/100.0 as total_gsv, STRING_AGG(DISTINCT key_from, ', ') as channels FROM bi_vala_order WHERE account_id IN ({ph}) AND pay_success_date IS NOT NULL AND order_status IN (3,4) AND deleted_at IS NULL GROUP BY account_id""", batch ) for aid, first_pay, gmv, gsv, channels in cur.fetchall(): paid_info[aid] = (str(first_pay) if first_pay else "", gmv or 0, gsv or 0, channels or "") # 最近登录 log(" 查询最近登录...") last_login = {} for i in range(0, len(uid_set), 500): batch = uid_set[i:i+500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT account_id, MAX(login_date) FROM bi_vala_app_account WHERE id IN ({ph}) AND status=1 AND deleted_at IS NULL GROUP BY account_id", batch ) for aid, dt in cur.fetchall(): if dt: last_login[aid] = str(dt) cur.close() # 组装小溪明细 log(" 组装小溪明细...") detail_rows = [] for aid in uid_set: row_idx, sales, jinxian, phone_raw = uid_rows[aid] info = aid_info.get(aid, {}) tel = info.get("tel", "") created_at = info.get("created_at", "") masked_tel = mask_phone(tel) or mask_phone(phone_raw) or "" masked_phone = mask_phone(phone_raw) or "" chars = account_chars.get(aid, []) exp_lessons = {} first_lesson_time = None first_lesson_ch = None best_latest_time = None best_latest_ch = None total_min = 0.0 for cid in chars: ctimes = char_chapter_times.get(cid, {}) for ch_id, ct in ctimes.items(): ch_info = chapter_map.get(ch_id) if not ch_info: continue cl, cs, cu, cl2 = ch_info if cs == "S0" and cu == "U00": if cl2 not in exp_lessons or ct < exp_lessons[cl2]: exp_lessons[cl2] = ct if first_lesson_time is None or ct < first_lesson_time: first_lesson_time = ct first_lesson_ch = ch_info lt = char_latest.get(cid) if lt and (best_latest_time is None or lt[0] > best_latest_time): best_latest_time = lt[0] best_latest_ch = lt[1] total_min += char_total_ms.get(cid, 0) / 60000.0 total_min = round(total_min, 1) if total_min == int(total_min): total_min = int(total_min) exp_count = len(exp_lessons) first_time_str = first_lesson_time.strftime("%Y-%m-%d %H:%M:%S") if first_lesson_time else "" first_ch_str = f"{first_lesson_ch[0]}-{first_lesson_ch[1]}-{first_lesson_ch[2]}-{first_lesson_ch[3]}" if first_lesson_ch else "" latest_time_str = best_latest_time.strftime("%Y-%m-%d %H:%M:%S") if best_latest_time else "" latest_ch_str = f"{chapter_map[best_latest_ch][0]}-{chapter_map[best_latest_ch][1]}-{chapter_map[best_latest_ch][2]}-{chapter_map[best_latest_ch][3]}" if best_latest_ch and best_latest_ch in chapter_map else "" act_level = activation.get(aid, "") if act_level: course_level = act_level course_type = "正式课" if aid in paid_info else "体验课" elif first_lesson_ch: course_level = first_lesson_ch[0] course_type = "体验课" else: course_level = "" course_type = "" nicknames = [] for cid in chars: for orig_aid, orig_cid, orig_nick in rc: if orig_cid == cid: nicknames.append(orig_nick) nickname_str = " / ".join(nicknames[:3]) pi = paid_info.get(aid) paid_status = "已付费" if pi else "未付费" first_pay_time = pi[0] if pi else "" total_gmv = pi[1] if pi else 0 total_gsv = pi[2] if pi else 0 channels = pi[3] if pi else "" row_data = [ str(aid), masked_tel, masked_phone, sales, jinxian, created_at, "", nickname_str, course_level, course_type, first_time_str, first_ch_str, exp_lessons.get("L01").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L01") else "", exp_lessons.get("L02").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L02") else "", exp_lessons.get("L03").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L03") else "", exp_lessons.get("L04").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L04") else "", exp_lessons.get("L05").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L05") else "", str(exp_count), latest_time_str, latest_ch_str, str(total_min), paid_status, first_pay_time, str(total_gmv), str(total_gsv), channels, last_login.get(aid, ""), ] detail_rows.append((aid, row_data)) # 写入小溪明细 log(" 写入小溪明细...") existing = read_sheet(token, SHEET_DETAIL) existing_map = {} for i, row in enumerate(existing[1:], start=2): if row and row[0]: try: existing_map[int(float(str(row[0]).strip()))] = i except (ValueError, TypeError): pass update_count = 0 new_rows_list = [] for aid, row_data in detail_rows: if aid in existing_map: ri = existing_map[aid] put_values(token, SHEET_DETAIL, f"A{ri}:AA{ri}", [row_data]) update_count += 1 else: new_rows_list.append(row_data) log(f" 更新: {update_count}, 新增: {len(new_rows_list)}") # 追加新行 for i in range(0, len(new_rows_list), 500): batch = new_rows_list[i:i+500] append_rows(token, SHEET_DETAIL, batch) log(f" 行课明细完成: 共 {len(detail_rows)} 条") return len(detail_rows) # ── Main ── def main(): log("=" * 50) log("行课查询 V2 启动 (销转客户主表)") try: token = get_fs_token() conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=get_pg_password(), dbname=PG_DB, connect_timeout=30 ) p1_total, p1_matched = phase1_phone_to_id(token, conn) p2_total = phase2_course_records(token, conn) conn.close() summary = f"Phase1(ID匹配): {p1_total} 行(匹配{p1_matched}) | Phase2(行课): {p2_total} 条" log(f"完成: {summary}") return 0 except Exception as e: log(f"ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())