From ba0dbe2ebaf3638c041bbc7bfb291a03f1823a3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=BA=AA?= Date: Sun, 31 May 2026 08:00:01 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=A4=96=20=E6=AF=8F=E6=97=A5=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E5=A4=87=E4=BB=BD=20-=202026-05-31=2008:00:01?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/auto_xingke_query_v2.py | 491 ++++++++++++++++++++++++++++++++ 1 file changed, 491 insertions(+) create mode 100644 scripts/auto_xingke_query_v2.py diff --git a/scripts/auto_xingke_query_v2.py b/scripts/auto_xingke_query_v2.py new file mode 100644 index 0000000..6f4b6c4 --- /dev/null +++ b/scripts/auto_xingke_query_v2.py @@ -0,0 +1,491 @@ +#!/usr/bin/env python3 +""" +行课查询自动回填 V2 — 适配销转客户主表(全量) +数据源: ERCFsFo4MhnF0ytGeCrc0Bb8n5f / 1RFMqc (全量) +输出: 小溪明细 sheet (1zB5Be),全量表通过 VLOOKUP 自动刷新 + +流程: + 1. 读取全量表 → 有手机无UID的 → 匹配 account_id → 回填 X/Y/Z 列 + 2. 读取全量表 → 所有有UID的 → 查行课数据 → 写入小溪明细 +""" +import json, requests, os, re, psycopg2, sys, time +from datetime import datetime +from collections import defaultdict + +# ── 配置 ── +PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" +PG_PORT = 28591 +PG_USER = "ai_member" +PG_DB = "vala_bi" + +SPREADSHEET_TOKEN = "ERCFsFo4MhnF0ytGeCrc0Bb8n5f" +SHEET_ALL = "1RFMqc" # 全量表 +SHEET_DETAIL = "1zB5Be" # 小溪明细 + +CRED_DIR = "/root/.openclaw/credentials/xiaoxi" +LOG_FILE = "/var/log/xiaoxi_xingke_query_v2.log" + +def log(msg): + ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{ts}] {msg}" + print(line) + with open(LOG_FILE, "a") as f: + f.write(line + "\n") + +def get_pg_password(): + secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env") + with open(secrets_path) as f: + for line in f: + if line.startswith("PG_ONLINE_PASSWORD="): + return line.strip().split("=", 1)[1].strip("'\"") + +def get_fs_token(): + with open(os.path.join(CRED_DIR, "config.json")) as f: + cfg = json.load(f) + resp = requests.post( + "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", + json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, + timeout=15 + ) + return resp.json()["tenant_access_token"] + +def read_sheet(token, sheet_id, range_str=None): + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" + if range_str: + url += f"!{range_str}" + for attempt in range(3): + try: + resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=90) + resp.encoding = 'utf-8' + data = json.loads(resp.text) + if data.get("code") != 0: + raise RuntimeError(f"读取Sheet失败: {data}") + return data["data"]["valueRange"]["values"] + except (json.JSONDecodeError, requests.exceptions.ConnectionError) as e: + log(f" 读取重试 {attempt+1}/3: {e}") + if attempt == 2: + log(f" resp.text[:100]: {resp.text[:100] if 'resp' in dir() else 'N/A'}") + time.sleep(2 ** attempt) + raise RuntimeError("读取Sheet失败: 3次重试均失败") + +def put_values(token, sheet_id, range_str, values): + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" + body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} + resp = requests.put(url, headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + }, json=body, timeout=30) + r = resp.json() + if r.get("code") != 0: + log(f" 写入失败 {range_str}: {r}") + return r + +def append_rows(token, sheet_id, values): + """追加行到 sheet 末尾""" + url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}:append" + body = {"valueRange": {"range": f"{sheet_id}!A:AA", "values": values}} + resp = requests.post(url, headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + }, json=body, timeout=30) + r = resp.json() + if r.get("code") != 0: + log(f" 追加失败: {r}") + return r + +def mask_phone(phone): + phone = str(phone).strip() + if "." in phone: + parts = phone.split(".") + if parts[1] in ("0", "00"): + phone = parts[0] + if re.match(r"^1\d{10}$", phone): + return f"{phone[:3]}****{phone[-4:]}" + return None + +def batch_in(cur, sql_tpl, params, chunk=500): + results = [] + for i in range(0, len(params), chunk): + batch = params[i:i+chunk] + ph = ",".join(["%s"] * len(batch)) + cur.execute(sql_tpl % ph, batch) + results.extend(cur.fetchall()) + return results + + +# ── Phase 1: 手机号→ID 匹配 ── +def phase1_phone_to_id(token, conn): + """读取全量表,对有手机无UID的行做匹配,回填 X/Y/Z 列""" + log("Phase 1: 手机号→ID 匹配") + rows = read_sheet(token, SHEET_ALL) + pending = [] + for idx, row in enumerate(rows[3:], start=4): + if len(row) < 5: + continue + # D列=手机号(索引3), I列=用户ID(索引8), X列=匹配uid(索引23) + phone = str(row[3]).strip() if len(row) > 3 and row[3] else "" + uid_i = str(row[8]).strip() if len(row) > 8 and row[8] else "" + uid_x = str(row[23]).strip() if len(row) > 23 and row[23] else "" + if not phone or uid_i or uid_x: + continue + if phone.startswith("="): + continue + pending.append({"row_idx": idx, "phone": phone}) + + log(f" 待匹配: {len(pending)}") + if not pending: + log(" 无需匹配, 跳过") + return 0, 0 + + phones_raw = list(set(r["phone"] for r in pending)) + valid_phones = [(p, mask_phone(p)) for p in phones_raw if mask_phone(p)] + masks = list(set(m[1] for m in valid_phones)) + + cur = conn.cursor() + masked_to_aid = {} + for i in range(0, len(masks), 500): + batch = masks[i:i+500] + ph = ",".join(["%s"] * len(batch)) + cur.execute( + f"SELECT id, tel FROM bi_vala_app_account WHERE tel IN ({ph}) AND status=1 AND deleted_at IS NULL", + batch + ) + for aid, tel in cur.fetchall(): + if tel not in masked_to_aid: + masked_to_aid[tel] = aid + + phone_to_aid = {} + for phone, m in valid_phones: + if m in masked_to_aid: + phone_to_aid[phone] = masked_to_aid[m] + + matched = len(phone_to_aid) + log(f" 匹配成功: {matched}, 未匹配: {len(valid_phones)-matched}") + + for r in pending: + phone = r["phone"] + row = r["row_idx"] + if phone in phone_to_aid: + put_values(token, SHEET_ALL, f"X{row}:X{row}", [[str(phone_to_aid[phone])]]) + + log(f" 回填完成: {len(pending)} 行") + cur.close() + return len(pending), matched + + +# ── Phase 2: 行课记录查询 & 写入小溪明细 ── +def phase2_course_records(token, conn): + """读取全量表所有UID → 查行课数据 → 写入小溪明细""" + log("Phase 2: 行课记录查询") + rows = read_sheet(token, SHEET_ALL) + + uid_rows = {} + for idx, row in enumerate(rows[3:], start=4): + uid = "" + if len(row) > 8 and row[8]: + try: + uid = str(int(float(str(row[8]).strip()))) + except (ValueError, TypeError): + pass + if not uid and len(row) > 23 and row[23]: + try: + uid = str(int(float(str(row[23]).strip()))) + except (ValueError, TypeError): + pass + if not uid: + continue + sales = str(row[0]).strip() if len(row) > 0 and row[0] else "" + jinxian = str(row[2]).strip() if len(row) > 2 and row[2] else "" + phone = str(row[3]).strip() if len(row) > 3 and row[3] else "" + aid = int(uid) + if aid <= 0: + continue + if aid not in uid_rows: + uid_rows[aid] = (idx, sales, jinxian, phone) + + uid_set = list(uid_rows.keys()) + log(f" 有效UID: {len(uid_set)}") + if not uid_set: + log(" 无UID, 跳过") + return 0 + + cur = conn.cursor() + + # 账户信息 + log(" 查询账户信息...") + aid_info = {} + for i in range(0, len(uid_set), 500): + batch = uid_set[i:i+500] + ph = ",".join(["%s"] * len(batch)) + cur.execute( + f"SELECT id, tel, created_at FROM bi_vala_app_account WHERE id IN ({ph}) AND status=1 AND deleted_at IS NULL", + batch + ) + for aid, tel, created_at in cur.fetchall(): + aid_info[aid] = {"tel": tel or "", "created_at": str(created_at) if created_at else ""} + + # 角色信息 + log(" 查询角色信息...") + account_chars = defaultdict(list) + char_to_account = {} + rc = batch_in(cur, + "SELECT account_id, id, nickname FROM bi_vala_app_character WHERE account_id IN (%s) AND nickname IS NOT NULL AND nickname != '' AND deleted_at IS NULL", + uid_set + ) + for aid, cid, nick in rc: + account_chars[aid].append(cid) + char_to_account[cid] = aid + char_ids = list(char_to_account.keys()) + log(f" 角色数: {len(char_ids)}") + + # 课程映射 + cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson") + chapter_map = {} + for ch_id, cl, cs, cu, cl2 in cur.fetchall(): + chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "") + + # 课时完成记录 + log(" 查询课时完成记录...") + char_chapter_times = defaultdict(dict) + char_latest = {} + for tbl_idx in range(8): + table = f"bi_user_chapter_play_record_{tbl_idx}" + try: + cur.execute( + f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)", + (char_ids,) + ) + for uid, ch_id, created_at in cur.fetchall(): + ch_data = chapter_map.get(ch_id) + if not ch_data: + continue + if ch_id not in char_chapter_times[uid] or created_at < char_chapter_times[uid][ch_id]: + char_chapter_times[uid][ch_id] = created_at + prev = char_latest.get(uid) + if prev is None or created_at > prev[0]: + char_latest[uid] = (created_at, ch_id, ch_data) + except Exception as e: + log(f" 警告 {table}: {e}") + + # 学习耗时 + log(" 查询学习耗时...") + char_total_ms = defaultdict(int) + for tbl_idx in range(8): + table = f"bi_user_component_play_record_{tbl_idx}" + try: + cur.execute( + f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id", + (char_ids,) + ) + for uid, total_ms in cur.fetchall(): + char_total_ms[uid] += (total_ms or 0) + except Exception as e: + log(f" 警告 {table}: {e}") + + # 激活状态 + log(" 查询激活状态...") + activation = {} + for i in range(0, len(uid_set), 500): + batch = uid_set[i:i+500] + ph = ",".join(["%s"] * len(batch)) + cur.execute( + f"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket WHERE account_id IN ({ph}) AND status=1 AND deleted_at IS NULL AND season_package_level IN ('A1','A2')", + batch + ) + for aid, lvl in cur.fetchall(): + if aid not in activation: + activation[aid] = lvl + + # 付费信息 + log(" 查询付费信息...") + paid_info = {} + for i in range(0, len(uid_set), 500): + batch = uid_set[i:i+500] + ph = ",".join(["%s"] * len(batch)) + cur.execute( + f"""SELECT account_id, + MIN(pay_success_date) as first_pay, + SUM(pay_amount_int)/100.0 as total_gmv, + SUM(CASE WHEN order_status=3 THEN pay_amount_int ELSE 0 END)/100.0 as total_gsv, + STRING_AGG(DISTINCT key_from, ', ') as channels + FROM bi_vala_order + WHERE account_id IN ({ph}) + AND pay_success_date IS NOT NULL + AND order_status IN (3,4) + AND deleted_at IS NULL + GROUP BY account_id""", + batch + ) + for aid, first_pay, gmv, gsv, channels in cur.fetchall(): + paid_info[aid] = (str(first_pay) if first_pay else "", gmv or 0, gsv or 0, channels or "") + + # 最近登录 + log(" 查询最近登录...") + last_login = {} + for i in range(0, len(uid_set), 500): + batch = uid_set[i:i+500] + ph = ",".join(["%s"] * len(batch)) + cur.execute( + f"SELECT account_id, MAX(login_date) FROM bi_vala_app_account WHERE id IN ({ph}) AND status=1 AND deleted_at IS NULL GROUP BY account_id", + batch + ) + for aid, dt in cur.fetchall(): + if dt: + last_login[aid] = str(dt) + + cur.close() + + # 组装小溪明细 + log(" 组装小溪明细...") + detail_rows = [] + for aid in uid_set: + row_idx, sales, jinxian, phone_raw = uid_rows[aid] + info = aid_info.get(aid, {}) + tel = info.get("tel", "") + created_at = info.get("created_at", "") + masked_tel = mask_phone(tel) or mask_phone(phone_raw) or "" + masked_phone = mask_phone(phone_raw) or "" + + chars = account_chars.get(aid, []) + + exp_lessons = {} + first_lesson_time = None + first_lesson_ch = None + best_latest_time = None + best_latest_ch = None + total_min = 0.0 + + for cid in chars: + ctimes = char_chapter_times.get(cid, {}) + for ch_id, ct in ctimes.items(): + ch_info = chapter_map.get(ch_id) + if not ch_info: + continue + cl, cs, cu, cl2 = ch_info + if cs == "S0" and cu == "U00": + if cl2 not in exp_lessons or ct < exp_lessons[cl2]: + exp_lessons[cl2] = ct + if first_lesson_time is None or ct < first_lesson_time: + first_lesson_time = ct + first_lesson_ch = ch_info + + lt = char_latest.get(cid) + if lt and (best_latest_time is None or lt[0] > best_latest_time): + best_latest_time = lt[0] + best_latest_ch = lt[1] + + total_min += char_total_ms.get(cid, 0) / 60000.0 + + total_min = round(total_min, 1) + if total_min == int(total_min): + total_min = int(total_min) + + exp_count = len(exp_lessons) + first_time_str = first_lesson_time.strftime("%Y-%m-%d %H:%M:%S") if first_lesson_time else "" + first_ch_str = f"{first_lesson_ch[0]}-{first_lesson_ch[1]}-{first_lesson_ch[2]}-{first_lesson_ch[3]}" if first_lesson_ch else "" + latest_time_str = best_latest_time.strftime("%Y-%m-%d %H:%M:%S") if best_latest_time else "" + latest_ch_str = f"{chapter_map[best_latest_ch][0]}-{chapter_map[best_latest_ch][1]}-{chapter_map[best_latest_ch][2]}-{chapter_map[best_latest_ch][3]}" if best_latest_ch and best_latest_ch in chapter_map else "" + + act_level = activation.get(aid, "") + if act_level: + course_level = act_level + course_type = "正式课" if aid in paid_info else "体验课" + elif first_lesson_ch: + course_level = first_lesson_ch[0] + course_type = "体验课" + else: + course_level = "" + course_type = "" + + nicknames = [] + for cid in chars: + for orig_aid, orig_cid, orig_nick in rc: + if orig_cid == cid: + nicknames.append(orig_nick) + nickname_str = " / ".join(nicknames[:3]) + + pi = paid_info.get(aid) + paid_status = "已付费" if pi else "未付费" + first_pay_time = pi[0] if pi else "" + total_gmv = pi[1] if pi else 0 + total_gsv = pi[2] if pi else 0 + channels = pi[3] if pi else "" + + row_data = [ + str(aid), masked_tel, masked_phone, sales, jinxian, created_at, "", + nickname_str, course_level, course_type, + first_time_str, first_ch_str, + exp_lessons.get("L01").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L01") else "", + exp_lessons.get("L02").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L02") else "", + exp_lessons.get("L03").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L03") else "", + exp_lessons.get("L04").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L04") else "", + exp_lessons.get("L05").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L05") else "", + str(exp_count), + latest_time_str, latest_ch_str, str(total_min), + paid_status, first_pay_time, + str(total_gmv), str(total_gsv), channels, + last_login.get(aid, ""), + ] + detail_rows.append((aid, row_data)) + + # 写入小溪明细 + log(" 写入小溪明细...") + existing = read_sheet(token, SHEET_DETAIL) + existing_map = {} + for i, row in enumerate(existing[1:], start=2): + if row and row[0]: + try: + existing_map[int(float(str(row[0]).strip()))] = i + except (ValueError, TypeError): + pass + + update_count = 0 + new_rows_list = [] + for aid, row_data in detail_rows: + if aid in existing_map: + ri = existing_map[aid] + put_values(token, SHEET_DETAIL, f"A{ri}:AA{ri}", [row_data]) + update_count += 1 + else: + new_rows_list.append(row_data) + + log(f" 更新: {update_count}, 新增: {len(new_rows_list)}") + + # 追加新行 + for i in range(0, len(new_rows_list), 500): + batch = new_rows_list[i:i+500] + append_rows(token, SHEET_DETAIL, batch) + + log(f" 行课明细完成: 共 {len(detail_rows)} 条") + return len(detail_rows) + + +# ── Main ── +def main(): + log("=" * 50) + log("行课查询 V2 启动 (销转客户主表)") + + try: + token = get_fs_token() + conn = psycopg2.connect( + host=PG_HOST, port=PG_PORT, user=PG_USER, + password=get_pg_password(), dbname=PG_DB, connect_timeout=30 + ) + + p1_total, p1_matched = phase1_phone_to_id(token, conn) + p2_total = phase2_course_records(token, conn) + + conn.close() + + summary = f"Phase1(ID匹配): {p1_total} 行(匹配{p1_matched}) | Phase2(行课): {p2_total} 条" + log(f"完成: {summary}") + return 0 + except Exception as e: + log(f"ERROR: {e}") + import traceback + traceback.print_exc() + return 1 + + +if __name__ == "__main__": + sys.exit(main())