#!/usr/bin/env python3 """ 端内析出leads数据 — 自动回填 + 统计汇总脚本 流程: 1. 读取曲慧萌/吴迪 sheet 中 A 列的手机号 2. XXTEA 加密 → 匹配 bi_vala_app_account.tel_encrypt → 获取 account_id 3. 查询注册日期、转化、退费、U0体验课完成日期 → 回写 4. 按析出月份汇总统计 → 写入"统计" sheet 统计口径: - 转化率 = 未退费转化leads / 总leads - 退费率 = 退费leads / 已转化leads - 完成率 = 完成该课的leads / 总leads 用法: python3 fill_leads_sheet.py [--dry-run] """ import sys import os import json import subprocess import re from collections import defaultdict # ── 配置 ────────────────────────────────────────────── SPREADSHEET_TOKEN = "FA3xsw3kph4pdatKlUrcyPgInAc" SHEET_QHM = "7f0e35" # 曲慧萌 SHEET_WD = "1K3O6s" # 吴迪 SHEET_STAT = "scyF3H" # 统计 # 端内渠道 INNER_CHANNELS = [ "app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0", ] # U0 体验课 chapter_id U0_CHAPTERS = { "L1-U0-L01": 343, "L1-U0-L02": 344, "L1-U0-L03": 345, "L1-U0-L04": 346, "L1-U0-L05": 348, "L2-U0-L01": 55, "L2-U0-L02": 56, "L2-U0-L03": 57, "L2-U0-L04": 58, "L2-U0-L05": 59, } # U0 列顺序(与表头一致) U0_COL_ORDER = [ "L1-U0-L01", "L1-U0-L02", "L1-U0-L03", "L1-U0-L04", "L1-U0-L05", "L2-U0-L01", "L2-U0-L02", "L2-U0-L03", "L2-U0-L04", "L2-U0-L05", ] # 列映射(0-based) COL_PHONE = 0 # A: 用户手机号 COL_USER_ID = 1 # B: 用户ID COL_EXTRACT_DATE = 2 # C: 析出日期(手动填) COL_REG_DATE = 3 # D: 注册日期 COL_CONVERTED = 4 # E: 是否转化 COL_CONVERT_DATE = 5 # F: 转化日期 COL_REFUND = 6 # G: 是否退费 COL_REFUND_DATE = 7 # H: 退费日期 COL_U0_START = 8 # I-R: L1-U0-L01 ~ L2-U0-L05 # ── 数据库 ───────────────────────────────────────────── PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = "28591" PG_USER = "ai_member" PG_DB = "vala_bi" PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*" # ── 加密 ─────────────────────────────────────────────── import xxtea import base64 XXTEA_KEY = "K1pNOZ5O5+ZqTPSHA2kzPdoNOMOGcv6g" def encrypt_phone(phone: str) -> str: encrypted = xxtea.encrypt(phone.encode(), XXTEA_KEY.encode()) result = base64.b64encode(encrypted).decode() result = result.replace("+", "-").replace("/", "_").replace("=", ".") return result def pg_query(sql: str) -> list[list]: env = os.environ.copy() env["PGPASSWORD"] = PG_PASSWORD cmd = [ "psql", "-h", PG_HOST, "-p", PG_PORT, "-U", PG_USER, "-d", PG_DB, "-t", "-A", "-F", "\t", "-c", sql, ] result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=60) if result.returncode != 0: print(f"[ERROR] PG query failed: {result.stderr}", file=sys.stderr) return [] lines = result.stdout.strip().split("\n") rows = [] for line in lines: if not line.strip(): continue rows.append(line.split("\t")) return rows def lark_read(sheet_id: str, range_str: str) -> list: result = subprocess.run( ["lark-cli", "sheets", "+read", "--as", "bot", "--spreadsheet-token", SPREADSHEET_TOKEN, "--sheet-id", sheet_id, "--range", range_str], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"[ERROR] lark read failed: {result.stderr}", file=sys.stderr) return [] data = json.loads(result.stdout) if not data.get("ok"): print(f"[ERROR] lark read error: {data}", file=sys.stderr) return [] return data["data"]["valueRange"]["values"] def lark_write(sheet_id: str, range_str: str, values: list) -> bool: payload = json.dumps(values) result = subprocess.run( ["lark-cli", "sheets", "+write", "--as", "bot", "--spreadsheet-token", SPREADSHEET_TOKEN, "--sheet-id", sheet_id, "--range", range_str, "--values", payload], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"[ERROR] lark write failed: {result.stderr}", file=sys.stderr) return False data = json.loads(result.stdout) return data.get("ok", False) def match_phones(phones: list[str]) -> dict[str, dict]: if not phones: return {} encrypted_map = {encrypt_phone(p): p for p in phones} enc_list = list(encrypted_map.keys()) BATCH_SIZE = 50 results = {} for i in range(0, len(enc_list), BATCH_SIZE): batch = enc_list[i:i + BATCH_SIZE] quoted = ",".join(f"'{e}'" for e in batch) sql = f""" SELECT id, name, tel_encrypt, created_at::date::text FROM bi_vala_app_account WHERE tel_encrypt IN ({quoted}) AND status = 1 AND deleted_at IS NULL """ for row in pg_query(sql): if len(row) >= 4: acc_id, name, tel_enc, created_at = row[0], row[1], row[2], row[3] plain = encrypted_map.get(tel_enc) if plain: results[plain] = {"id": acc_id, "name": name, "created_at": created_at} return results def query_conversion(account_ids: list[str]) -> dict[str, dict]: if not account_ids: return {} BATCH_SIZE = 100 results = {} channels_str = ",".join(f"'{c}'" for c in INNER_CHANNELS) for i in range(0, len(account_ids), BATCH_SIZE): batch = account_ids[i:i + BATCH_SIZE] ids_str = ",".join(batch) sql = f""" SELECT o.account_id::text, MIN(o.pay_success_date::date::text) AS first_pay_date, BOOL_OR(r.id IS NOT NULL AND r.status = 3 AND o2.order_status = 4) AS has_refund, MIN(CASE WHEN r.id IS NOT NULL AND r.status = 3 AND o2.order_status = 4 THEN r.created_at::date::text END) AS first_refund_date FROM bi_vala_order o LEFT JOIN bi_refund_order r ON o.trade_no = r.trade_no AND r.status = 3 LEFT JOIN bi_vala_order o2 ON o.trade_no = o2.trade_no AND o2.order_status = 4 WHERE o.account_id IN ({ids_str}) AND o.key_from IN ({channels_str}) AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4) GROUP BY o.account_id """ for row in pg_query(sql): if len(row) >= 4: acc_id, first_pay, has_refund, first_refund = row[0], row[1], row[2], row[3] results[acc_id] = { "converted": "是" if first_pay else "否", "convert_date": first_pay or "", "refunded": "是" if has_refund in ("t", "true") else "否", "refund_date": first_refund or "", } return results def query_learning(account_ids: list[str]) -> dict[str, dict[str, str]]: if not account_ids: return {} BATCH_SIZE = 100 all_chapter_dates = {} for i in range(0, len(account_ids), BATCH_SIZE): batch = account_ids[i:i + BATCH_SIZE] ids_str = ",".join(batch) char_sql = f""" SELECT c.account_id::text, c.id::text FROM bi_vala_app_character c WHERE c.account_id IN ({ids_str}) AND c.deleted_at IS NULL """ char_rows = pg_query(char_sql) char_map = {} for row in char_rows: acc_id, char_id = row[0], row[1] char_map.setdefault(acc_id, []).append(char_id) if not char_map: continue all_char_ids = [] for cids in char_map.values(): all_char_ids.extend(cids) chapter_ids = list(U0_CHAPTERS.values()) chapter_str = ",".join(str(c) for c in chapter_ids) for table_idx in range(8): table_name = f"bi_user_chapter_play_record_{table_idx}" char_batches = [all_char_ids[j:j + 200] for j in range(0, len(all_char_ids), 200)] for char_batch in char_batches: chars_str = ",".join(char_batch) sql = f""" SELECT user_id::text, chapter_id, MIN(created_at::date::text) FROM {table_name} WHERE user_id IN ({chars_str}) AND chapter_id IN ({chapter_str}) AND play_status = 1 GROUP BY user_id, chapter_id """ try: rows = pg_query(sql) except Exception: continue for row in rows: if len(row) >= 3: char_id, ch_id, comp_date = row[0], int(row[1]), row[2] for acc_id, cids in char_map.items(): if char_id in cids: all_chapter_dates.setdefault(acc_id, {}) for name, cid in U0_CHAPTERS.items(): if cid == ch_id: all_chapter_dates[acc_id][name] = comp_date break break return all_chapter_dates # ── 处理单个销售 sheet ────────────────────────────────── def process_sheet(sheet_id: str, sheet_name: str, dry_run: bool = False) -> list[dict]: """处理单个销售 sheet,回填数据,返回 lead 数据列表供统计使用""" print(f"\n{'='*60}") print(f"处理 Sheet: {sheet_name} ({sheet_id})") print(f"{'='*60}") range_str = f"{sheet_id}!A2:R" try: rows = lark_read(sheet_id, range_str) except Exception as e: print(f"[ERROR] 读取失败: {e}") return [] if not rows: print("没有数据行") return [] print(f"读取到 {len(rows)} 行数据") phone_to_row = {} for idx, row in enumerate(rows): if len(row) > COL_PHONE and row[COL_PHONE]: phone = str(row[COL_PHONE]).strip() if phone and re.match(r'^1\d{10}$', phone): phone_to_row.setdefault(phone, []).append(idx) if not phone_to_row: print("没有有效的手机号") return [] phones = list(phone_to_row.keys()) print(f"有效手机号: {len(phones)} 个") print("→ 匹配 account_id...") acc_info = match_phones(phones) print(f" 匹配到 {len(acc_info)} 个账号") matched_accounts = [info["id"] for info in acc_info.values()] matched_phones = set(acc_info.keys()) print("→ 查询转化信息...") conv_info = query_conversion(matched_accounts) print("→ 查询 U0 学习进度...") learn_info = query_learning(matched_accounts) updates = [] lead_data = [] for phone, row_indices in phone_to_row.items(): info = acc_info.get(phone) if not info: continue acc_id = info["id"] conv = conv_info.get(acc_id, {}) learn = learn_info.get(acc_id, {}) for row_idx in row_indices: extract_date = "" if len(rows[row_idx]) > COL_EXTRACT_DATE and rows[row_idx][COL_EXTRACT_DATE]: extract_date = str(rows[row_idx][COL_EXTRACT_DATE]).strip() updates.append((row_idx, COL_USER_ID, acc_id)) updates.append((row_idx, COL_REG_DATE, info.get("created_at", ""))) updates.append((row_idx, COL_CONVERTED, conv.get("converted", "否"))) updates.append((row_idx, COL_CONVERT_DATE, conv.get("convert_date", ""))) updates.append((row_idx, COL_REFUND, conv.get("refunded", "否"))) updates.append((row_idx, COL_REFUND_DATE, conv.get("refund_date", ""))) for col_offset, lesson_name in enumerate(U0_COL_ORDER): updates.append((row_idx, COL_U0_START + col_offset, learn.get(lesson_name, ""))) lead_data.append({ "extract_date": extract_date, "converted": conv.get("converted", "否"), "refunded": conv.get("refunded", "否"), "lessons": {k: learn.get(k, "") for k in U0_COL_ORDER}, }) # 回写 row_updates = {} for row_idx, col, val in updates: row_updates.setdefault(row_idx, {})[col] = val print(f"\n→ 准备回写 {len(row_updates)} 行数据...") for row_idx, col_vals in sorted(row_updates.items()): actual_row = row_idx + 2 # 分开写入:B列单独写,D-R列一起写,跳过C列(析出日期由销售手动维护) # B: 用户ID if COL_USER_ID in col_vals: lark_write(sheet_id, f"{sheet_id}!B{actual_row}:B{actual_row}", [[str(col_vals[COL_USER_ID])]]) # D-R: 注册日期 ~ L2-U0-L5 d_to_r = [] for col in range(COL_REG_DATE, COL_U0_START + len(U0_COL_ORDER)): val = col_vals.get(col, "") d_to_r.append(str(val) if val else "") if dry_run: print(f" [DRY-RUN] {sheet_id}!B{actual_row} + D{actual_row}:R{actual_row} ← ...") else: lark_write(sheet_id, f"{sheet_id}!D{actual_row}:R{actual_row}", [d_to_r]) print(f" ✓ 行 {actual_row} 回写成功") unmatched = set(phones) - matched_phones if unmatched: print(f"\n⚠️ 未匹配到账号的手机号 ({len(unmatched)} 个):") for p in sorted(unmatched): print(f" {p}") return lead_data # ── 统计汇总 ──────────────────────────────────────────── def compute_stats(lead_data: list[dict]) -> dict[str, dict]: """ 按析出月份汇总统计 口径: - 转化率 = 未退费转化leads / 总leads - 退费率 = 退费leads / 已转化leads - 完成率 = 完成该课的leads / 总leads """ month_groups = defaultdict(list) for lead in lead_data: extract = lead.get("extract_date", "") if not extract: continue m = re.match(r'(\d{4})[-/](\d{1,2})', extract) if m: month = f"{m.group(1)}-{m.group(2).zfill(2)}" else: continue month_groups[month].append(lead) if not month_groups: return {} result = {} for month, leads in sorted(month_groups.items()): total = len(leads) converted_all = sum(1 for l in leads if l["converted"] == "是") refunded = sum(1 for l in leads if l["refunded"] == "是") converted_unrefunded = sum(1 for l in leads if l["converted"] == "是" and l["refunded"] != "是") conv_rate = converted_unrefunded / total * 100 if total > 0 else 0 refund_rate = refunded / converted_all * 100 if converted_all > 0 else 0 lesson_rates = {} for lesson_name in U0_COL_ORDER: completed = sum(1 for l in leads if l["lessons"].get(lesson_name, "")) lesson_rates[lesson_name] = completed / total * 100 if total > 0 else 0 result[month] = { "total": total, "converted_all": converted_all, "converted_unrefunded": converted_unrefunded, "refunded": refunded, "conv_rate": conv_rate, "refund_rate": refund_rate, "lesson_rates": lesson_rates, } return result def write_all_stats(all_stats: dict[str, dict[str, dict]], dry_run: bool = False): """ 将所有销售的统计数据写入统计 sheet all_stats: {sales_name: {month: {conv_rate, refund_rate, ...}}} 按 销售+月份 逐行写入,从第2行开始 """ # 构建有序行列表: [(sales_name, month, stats), ...] rows_data = [] for sales_name in ["曲慧萌", "吴迪"]: stats = all_stats.get(sales_name, {}) for month in sorted(stats.keys()): rows_data.append((sales_name, month, stats[month])) if not rows_data: print(" 无统计数据") return for i, (sales_name, month, s) in enumerate(rows_data): row_num = i + 2 # 从第2行开始 # A: 销售名 lark_write(SHEET_STAT, f"{SHEET_STAT}!A{row_num}:A{row_num}", [[sales_name]]) # B: 月份 lark_write(SHEET_STAT, f"{SHEET_STAT}!B{row_num}:B{row_num}", [[month]]) # C: 转化率(小数,配合百分比格式显示) lark_write(SHEET_STAT, f"{SHEET_STAT}!C{row_num}:C{row_num}", [[round(s["conv_rate"] / 100, 3)]]) # D: 退费率 lark_write(SHEET_STAT, f"{SHEET_STAT}!D{row_num}:D{row_num}", [[round(s["refund_rate"] / 100, 3)]]) # E-N: 完成率 lesson_vals = [round(s["lesson_rates"][name] / 100, 3) for name in U0_COL_ORDER] lark_write(SHEET_STAT, f"{SHEET_STAT}!E{row_num}:N{row_num}", [lesson_vals]) print(f" ✓ {sales_name} {month}: 转化率={s['conv_rate']:.1f}% " f"退费率={s['refund_rate']:.1f}% 总leads={s['total']}") # ── 主流程 ────────────────────────────────────────────── def main(): dry_run = "--dry-run" in sys.argv if dry_run: print("⚠️ DRY-RUN 模式,不会实际写入\n") # 处理两个销售 sheet qhm_data = process_sheet(SHEET_QHM, "曲慧萌", dry_run) wd_data = process_sheet(SHEET_WD, "吴迪", dry_run) # 汇总统计 print(f"\n{'='*60}") print("汇总统计 → 统计 sheet") print(f"{'='*60}") qhm_stats = compute_stats(qhm_data) wd_stats = compute_stats(wd_data) all_stats = {"曲慧萌": qhm_stats, "吴迪": wd_stats} if dry_run: for sales_name, stats in all_stats.items(): for month, s in stats.items(): print(f" [DRY-RUN] {sales_name} {month}: 转化率={s['conv_rate']:.1f}% 退费率={s['refund_rate']:.1f}%") else: write_all_stats(all_stats, dry_run) print("\n✅ 处理完成") if __name__ == "__main__": main()