#!/usr/bin/env python3 """ 端内析出leads数据 — 自动回填 + 统计汇总脚本 流程: 1. 读取吴迪 sheet 中 B 列的手机号 2. XXTEA 加密 → 匹配 bi_vala_app_account.tel_encrypt → 获取 account_id 3. 查询注册日期、转化(全部key_from,购课时间≥析出时间)、转化keyfrom、退费、U0体验课完成日期 → 回写 4. 按析出月份汇总统计 → 写入"统计" sheet 统计口径: - 转化率 = 已转化leads / 总leads - 退费率 = 退费leads / 已转化leads - 完成率 = 完成该课的leads / 总leads 用法: python3 fill_leads_sheet.py [--dry-run] """ import sys import os import json import subprocess import re from collections import defaultdict # ── 配置 ────────────────────────────────────────────── SPREADSHEET_TOKEN = "FA3xsw3kph4pdatKlUrcyPgInAc" SHEET_WD = "1K3O6s" # 吴迪 SHEET_STAT = "scyF3H" # 统计 # U0 体验课 chapter_id U0_CHAPTERS = { "L1-U0-L01": 343, "L1-U0-L02": 344, "L1-U0-L03": 345, "L1-U0-L04": 346, "L1-U0-L05": 348, "L2-U0-L01": 55, "L2-U0-L02": 56, "L2-U0-L03": 57, "L2-U0-L04": 58, "L2-U0-L05": 59, } # U0 列顺序(与表头一致) U0_COL_ORDER = [ "L1-U0-L01", "L1-U0-L02", "L1-U0-L03", "L1-U0-L04", "L1-U0-L05", "L2-U0-L01", "L2-U0-L02", "L2-U0-L03", "L2-U0-L04", "L2-U0-L05", ] # 列映射(0-based) # 注意:A列「序号」和C列「微信昵称」由销售手动填写,脚本不读写 COL_SEQ = 0 # A: 序号(手动填,脚本跳过) COL_PHONE = 1 # B: 用户手机号 COL_NICKNAME = 2 # C: 微信昵称(销售手动填写,脚本跳过) COL_USER_ID = 3 # D: 用户ID COL_EXTRACT_DATE = 4 # E: 析出日期(手动填) COL_REG_DATE = 5 # F: 注册日期 COL_CONVERTED = 6 # G: 是否转化 COL_CONVERT_DATE = 7 # H: 转化日期 COL_CONVERT_KEYFROM = 8 # I: 转化keyfrom COL_CONVERT_GSV = 9 # J: 转化金额(GSV)(新增) COL_REFUND = 10 # K: 是否退费 COL_REFUND_DATE = 11 # L: 退费日期 COL_U0_START = 12 # M-W: L1-U0-L01 ~ L2-U0-L05 # ── 数据库 ───────────────────────────────────────────── PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = "28591" PG_USER = "ai_member" PG_DB = "vala_bi" PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*" # ── 加密 ─────────────────────────────────────────────── import xxtea import base64 XXTEA_KEY = "K1pNOZ5O5+ZqTPSHA2kzPdoNOMOGcv6g" def excel_serial_to_date(serial: int) -> str: """Excel 序列日期 → YYYY-MM-DD 字符串""" from datetime import datetime, timedelta if serial >= 61: dt = datetime(1899, 12, 30) + timedelta(days=serial) else: dt = datetime(1899, 12, 31) + timedelta(days=serial) return dt.strftime("%Y-%m-%d") def encrypt_phone(phone: str) -> str: encrypted = xxtea.encrypt(phone.encode(), XXTEA_KEY.encode()) result = base64.b64encode(encrypted).decode() result = result.replace("+", "-").replace("/", "_").replace("=", ".") return result def pg_query(sql: str) -> list[list]: env = os.environ.copy() env["PGPASSWORD"] = PG_PASSWORD cmd = [ "psql", "-h", PG_HOST, "-p", PG_PORT, "-U", PG_USER, "-d", PG_DB, "-t", "-A", "-F", "\t", "-c", sql, ] result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=60) if result.returncode != 0: print(f"[ERROR] PG query failed: {result.stderr}", file=sys.stderr) return [] lines = result.stdout.strip().split("\n") rows = [] for line in lines: if not line.strip(): continue rows.append(line.split("\t")) return rows def lark_read(sheet_id: str, range_str: str) -> list: result = subprocess.run( ["lark-cli", "sheets", "+read", "--as", "bot", "--spreadsheet-token", SPREADSHEET_TOKEN, "--sheet-id", sheet_id, "--range", range_str], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"[ERROR] lark read failed: {result.stderr}", file=sys.stderr) return [] data = json.loads(result.stdout) if not data.get("ok"): print(f"[ERROR] lark read error: {data}", file=sys.stderr) return [] return data["data"]["valueRange"]["values"] def lark_write(sheet_id: str, range_str: str, values: list) -> bool: payload = json.dumps(values) result = subprocess.run( ["lark-cli", "sheets", "+write", "--as", "bot", "--spreadsheet-token", SPREADSHEET_TOKEN, "--sheet-id", sheet_id, "--range", range_str, "--values", payload], capture_output=True, text=True, timeout=30 ) if result.returncode != 0: print(f"[ERROR] lark write failed: {result.stderr}", file=sys.stderr) return False data = json.loads(result.stdout) return data.get("ok", False) def match_phones(phones: list[str]) -> dict[str, dict]: if not phones: return {} encrypted_map = {encrypt_phone(p): p for p in phones} enc_list = list(encrypted_map.keys()) BATCH_SIZE = 50 results = {} for i in range(0, len(enc_list), BATCH_SIZE): batch = enc_list[i:i + BATCH_SIZE] quoted = ",".join(f"'{e}'" for e in batch) sql = f""" SELECT id, name, tel_encrypt, created_at::date::text FROM bi_vala_app_account WHERE tel_encrypt IN ({quoted}) AND status = 1 AND deleted_at IS NULL """ for row in pg_query(sql): if len(row) >= 4: acc_id, name, tel_enc, created_at = row[0], row[1], row[2], row[3] plain = encrypted_map.get(tel_enc) if plain: results[plain] = {"id": acc_id, "name": name, "created_at": created_at} return results def query_orders_and_refunds(account_ids: list[str]) -> dict[str, dict]: """ 查询全部渠道订单及退费信息(不限 key_from)。 返回: {account_id: {"orders": [(pay_date, key_from, is_refunded), ...], "total_gmv_cents": int, "total_gsv_cents": int}} orders 按 pay_success_date 升序排列。 """ if not account_ids: return {} BATCH_SIZE = 100 results = {} for i in range(0, len(account_ids), BATCH_SIZE): batch = account_ids[i:i + BATCH_SIZE] ids_str = ",".join(batch) # 查询每笔订单及退费状态 sql = f""" SELECT o.account_id::text, o.pay_success_date::date::text, o.key_from, o.pay_amount_int, COALESCE(SUM(CASE WHEN r.status = 3 AND o2.order_status = 4 THEN r.refund_amount::numeric * 100 ELSE 0 END), 0)::bigint AS total_refund_fen, BOOL_OR(r.id IS NOT NULL AND r.status = 3 AND o2.order_status = 4) AS is_refunded FROM bi_vala_order o LEFT JOIN bi_refund_order r ON o.trade_no = r.trade_no LEFT JOIN bi_vala_order o2 ON o.trade_no = o2.trade_no AND o2.order_status = 4 WHERE o.account_id IN ({ids_str}) AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4) GROUP BY o.account_id, o.pay_success_date, o.key_from, o.pay_amount_int ORDER BY o.account_id, o.pay_success_date """ for row in pg_query(sql): if len(row) >= 6: acc_id, pay_date, key_from, pay_amount, total_refund_fen, is_refunded = row[0], row[1], row[2], row[3], row[4], row[5] if acc_id not in results: results[acc_id] = {"orders": [], "total_gmv_cents": 0, "total_gsv_cents": 0} refunded = is_refunded in ("t", "true") results[acc_id]["orders"].append((pay_date, key_from, refunded)) # 累加用户总 GMV(分)和 GSV(分) results[acc_id]["total_gmv_cents"] += int(pay_amount) gsv_fen = int(pay_amount) - int(total_refund_fen) results[acc_id]["total_gsv_cents"] += gsv_fen return results def query_learning(account_ids: list[str]) -> dict[str, dict[str, str]]: if not account_ids: return {} BATCH_SIZE = 100 all_chapter_dates = {} for i in range(0, len(account_ids), BATCH_SIZE): batch = account_ids[i:i + BATCH_SIZE] ids_str = ",".join(batch) char_sql = f""" SELECT c.account_id::text, c.id::text FROM bi_vala_app_character c WHERE c.account_id IN ({ids_str}) AND c.deleted_at IS NULL """ char_rows = pg_query(char_sql) char_map = {} for row in char_rows: acc_id, char_id = row[0], row[1] char_map.setdefault(acc_id, []).append(char_id) if not char_map: continue all_char_ids = [] for cids in char_map.values(): all_char_ids.extend(cids) chapter_ids = list(U0_CHAPTERS.values()) chapter_str = ",".join(str(c) for c in chapter_ids) for table_idx in range(8): table_name = f"bi_user_chapter_play_record_{table_idx}" char_batches = [all_char_ids[j:j + 200] for j in range(0, len(all_char_ids), 200)] for char_batch in char_batches: chars_str = ",".join(char_batch) sql = f""" SELECT user_id::text, chapter_id, MIN(created_at::date::text) FROM {table_name} WHERE user_id IN ({chars_str}) AND chapter_id IN ({chapter_str}) AND play_status = 1 GROUP BY user_id, chapter_id """ try: rows = pg_query(sql) except Exception: continue for row in rows: if len(row) >= 3: char_id, ch_id, comp_date = row[0], int(row[1]), row[2] for acc_id, cids in char_map.items(): if char_id in cids: all_chapter_dates.setdefault(acc_id, {}) for name, cid in U0_CHAPTERS.items(): if cid == ch_id: all_chapter_dates[acc_id][name] = comp_date break break return all_chapter_dates # ── 处理单个销售 sheet ────────────────────────────────── def process_sheet(sheet_id: str, sheet_name: str, dry_run: bool = False) -> list[dict]: """处理单个销售 sheet,回填数据,返回 lead 数据列表供统计使用""" print(f"\n{'='*60}") print(f"处理 Sheet: {sheet_name} ({sheet_id})") print(f"{'='*60}") range_str = f"{sheet_id}!A2:W" try: rows = lark_read(sheet_id, range_str) except Exception as e: print(f"[ERROR] 读取失败: {e}") return [] if not rows: print("没有数据行") return [] print(f"读取到 {len(rows)} 行数据") phone_to_row = {} for idx, row in enumerate(rows): if len(row) > COL_PHONE and row[COL_PHONE]: phone = str(row[COL_PHONE]).strip() if phone and re.match(r'^1\d{10}$', phone): phone_to_row.setdefault(phone, []).append(idx) if not phone_to_row: print("没有有效的手机号") return [] phones = list(phone_to_row.keys()) print(f"有效手机号: {len(phones)} 个") print("→ 匹配 account_id...") acc_info = match_phones(phones) print(f" 匹配到 {len(acc_info)} 个账号") matched_accounts = [info["id"] for info in acc_info.values()] matched_phones = set(acc_info.keys()) print("→ 查询订单及退费信息(全部渠道)...") order_info = query_orders_and_refunds(matched_accounts) print("→ 查询 U0 学习进度...") learn_info = query_learning(matched_accounts) updates = [] lead_data = [] matched_row_indices = set() for phone, row_indices in phone_to_row.items(): info = acc_info.get(phone) if not info: continue acc_id = info["id"] learn = learn_info.get(acc_id, {}) for row_idx in row_indices: matched_row_indices.add(row_idx) extract_date = "" if len(rows[row_idx]) > COL_EXTRACT_DATE and rows[row_idx][COL_EXTRACT_DATE]: extract_date = str(rows[row_idx][COL_EXTRACT_DATE]).strip() # 转化判断:全部渠道,购课时间 ≥ 析出时间 # 退费判断:只要存在一笔 ≥ 析出日期且未退费的订单,就算未退费 # 转化金额:用户全部订单 GSV(全部 pay_amount - 全部 refund_amount) orders = order_info.get(acc_id, {}) conv_date = "" conv_keyfrom = "" conv_gsv = "" is_refunded = "否" # 析出日期可能是 Excel 序列数字,统一转为 YYYY-MM-DD extract_date_ymd = "" if extract_date: if extract_date.isdigit(): extract_date_ymd = excel_serial_to_date(int(extract_date)) else: extract_date_ymd = extract_date if extract_date_ymd: first_match = None has_valid_order = False for pay_date, kf, refunded in orders.get("orders", []): if pay_date >= extract_date_ymd: if first_match is None: first_match = (pay_date, kf) if not refunded: has_valid_order = True if first_match: conv_date = first_match[0] conv_keyfrom = first_match[1] # 转化金额 = 用户总 GSV(元) total_gsv = orders.get("total_gsv_cents", 0) conv_gsv = str(round(total_gsv / 100, 2)) is_refunded = "否" if has_valid_order else "是" f_value = "是" if conv_date else "否" updates.append((row_idx, COL_USER_ID, acc_id)) updates.append((row_idx, COL_REG_DATE, info.get("created_at", ""))) updates.append((row_idx, COL_CONVERTED, f_value)) updates.append((row_idx, COL_CONVERT_DATE, conv_date)) updates.append((row_idx, COL_CONVERT_KEYFROM, conv_keyfrom)) updates.append((row_idx, COL_CONVERT_GSV, conv_gsv)) updates.append((row_idx, COL_REFUND, is_refunded)) updates.append((row_idx, COL_REFUND_DATE, "")) for col_offset, lesson_name in enumerate(U0_COL_ORDER): updates.append((row_idx, COL_U0_START + col_offset, learn.get(lesson_name, ""))) lead_data.append({ "extract_date": extract_date, "converted": f_value, "refunded": is_refunded, "gmv_cents": orders.get("total_gmv_cents", 0), "gsv_cents": orders.get("total_gsv_cents", 0), "lessons": {k: learn.get(k, "") for k in U0_COL_ORDER}, "has_phone": True, }) # 未匹配手机号的行也纳入统计,转化状态取 F 列已有值 for row_idx, row in enumerate(rows): if row_idx in matched_row_indices: continue extract_date = "" if len(row) > COL_EXTRACT_DATE and row[COL_EXTRACT_DATE]: extract_date = str(row[COL_EXTRACT_DATE]).strip() # 读取 F 列已有值作为转化状态 existing_converted = "否" if len(row) > COL_CONVERTED and row[COL_CONVERTED]: existing_converted = str(row[COL_CONVERTED]).strip() lead_data.append({ "extract_date": extract_date, "converted": existing_converted, "refunded": "否", "gmv_cents": 0, "gsv_cents": 0, "lessons": {k: "" for k in U0_COL_ORDER}, "has_phone": False, }) # 回写 row_updates = {} for row_idx, col, val in updates: row_updates.setdefault(row_idx, {})[col] = val print(f"\n→ 准备回写 {len(row_updates)} 行数据...") for row_idx, col_vals in sorted(row_updates.items()): actual_row = row_idx + 2 # 分开写入:D列单独写,F-T列一起写,跳过A列(序号)、C列(微信昵称)和E列(析出日期由销售手动维护) # D: 用户ID if COL_USER_ID in col_vals: lark_write(sheet_id, f"{sheet_id}!D{actual_row}:D{actual_row}", [[str(col_vals[COL_USER_ID])]]) # F-V: 注册日期 ~ L2-U0-L5(含 I 列转化keyfrom) f_to_w = [] for col in range(COL_REG_DATE, COL_U0_START + len(U0_COL_ORDER)): val = col_vals.get(col, "") f_to_w.append(str(val) if val else "") if dry_run: print(f" [DRY-RUN] {sheet_id}!D{actual_row} + F{actual_row}:W{actual_row} ← ...") else: lark_write(sheet_id, f"{sheet_id}!F{actual_row}:W{actual_row}", [f_to_w]) print(f" ✓ 行 {actual_row} 回写成功") unmatched = set(phones) - matched_phones if unmatched: print(f"\n⚠️ 未匹配到账号的手机号 ({len(unmatched)} 个):") for p in sorted(unmatched): print(f" {p}") return lead_data # ── 统计汇总 ──────────────────────────────────────────── def compute_stats(lead_data: list[dict]) -> dict[str, dict]: """ 按析出月份汇总统计 口径: - 转化率 = 已转化leads / 总leads - 退费率 = 退费leads / 已转化leads - 完成率 = 完成该课的leads / 总leads """ month_groups = defaultdict(list) for lead in lead_data: extract = lead.get("extract_date", "") if not extract: continue # 支持三种格式: YYYY-MM-DD / YYYY/MM/DD / M月D日 / Excel序列数字 extract_str = str(extract).strip() # 尝试 Excel 序列数字 if extract_str.isdigit(): month = excel_serial_to_date(int(extract_str))[:7] else: m = re.match(r'(\d{4})[-/](\d{1,2})', extract_str) if m: month = f"{m.group(1)}-{m.group(2).zfill(2)}" else: m = re.match(r'(\d{1,2})月\d{1,2}日', extract_str) if m: from datetime import datetime year = datetime.now().year month = f"{year}-{m.group(1).zfill(2)}" else: continue month_groups[month].append(lead) if not month_groups: return {} result = {} for month, leads in sorted(month_groups.items()): total = len(leads) matched = sum(1 for l in leads if l.get("has_phone", False)) converted_all = sum(1 for l in leads if l["converted"] == "是") refunded = sum(1 for l in leads if l["refunded"] == "是") converted_unrefunded = sum(1 for l in leads if l["converted"] == "是" and l["refunded"] != "是") conv_rate = converted_all / total * 100 if total > 0 else 0 refund_rate = refunded / converted_all * 100 if converted_all > 0 else 0 # GMV / GSV 汇总(元) total_gmv = sum(l.get("gmv_cents", 0) for l in leads) / 100 total_gsv = sum(l.get("gsv_cents", 0) for l in leads) / 100 lesson_rates = {} for lesson_name in U0_COL_ORDER: completed = sum(1 for l in leads if l["lessons"].get(lesson_name, "")) lesson_rates[lesson_name] = completed / total * 100 if total > 0 else 0 result[month] = { "total": total, "matched": matched, "converted_all": converted_all, "converted_unrefunded": converted_unrefunded, "refunded": refunded, "conv_rate": conv_rate, "refund_rate": refund_rate, "gmv": total_gmv, "gsv": total_gsv, "lesson_rates": lesson_rates, } return result def write_all_stats(all_stats: dict[str, dict[str, dict]], dry_run: bool = False): """ 将所有销售的统计数据写入统计 sheet all_stats: {sales_name: {month: {conv_rate, refund_rate, ...}}} 按 销售+月份 逐行写入,从第2行开始 """ # 先写表头 header = ["销售", "月份", "总析出用户数", "匹配用户数", "转化用户数", "转化率", "退费率", "GMV", "GSV"] + \ [f"{name}完成率" for name in U0_COL_ORDER] if not dry_run: lark_write(SHEET_STAT, f"{SHEET_STAT}!A1:S1", [header]) # 构建有序行列表: [(sales_name, month, stats), ...] rows_data = [] for sales_name in ["吴迪"]: stats = all_stats.get(sales_name, {}) for month in sorted(stats.keys()): rows_data.append((sales_name, month, stats[month])) # 先清除统计 sheet 旧数据(A2:S50),避免残留旧行 print(" → 清除统计 sheet 旧数据...") lark_write(SHEET_STAT, f"{SHEET_STAT}!A2:S50", [[""] * 19] * 49) if not rows_data: print(" 无统计数据") return for i, (sales_name, month, s) in enumerate(rows_data): row_num = i + 2 # 从第2行开始 # A: 销售名 lark_write(SHEET_STAT, f"{SHEET_STAT}!A{row_num}:A{row_num}", [[sales_name]]) # B: 月份 lark_write(SHEET_STAT, f"{SHEET_STAT}!B{row_num}:B{row_num}", [[month]]) # C: 总析出用户数 lark_write(SHEET_STAT, f"{SHEET_STAT}!C{row_num}:C{row_num}", [[s["total"]]]) # D: 匹配用户数 lark_write(SHEET_STAT, f"{SHEET_STAT}!D{row_num}:D{row_num}", [[s["matched"]]]) # E: 转化用户数 lark_write(SHEET_STAT, f"{SHEET_STAT}!E{row_num}:E{row_num}", [[s["converted_all"]]]) # F: 转化率(小数,配合百分比格式显示) lark_write(SHEET_STAT, f"{SHEET_STAT}!F{row_num}:F{row_num}", [[round(s["conv_rate"] / 100, 3)]]) # G: 退费率 lark_write(SHEET_STAT, f"{SHEET_STAT}!G{row_num}:G{row_num}", [[round(s["refund_rate"] / 100, 3)]]) # H: GMV(元) lark_write(SHEET_STAT, f"{SHEET_STAT}!H{row_num}:H{row_num}", [[round(s["gmv"], 2)]]) # I: GSV(元) lark_write(SHEET_STAT, f"{SHEET_STAT}!I{row_num}:I{row_num}", [[round(s["gsv"], 2)]]) # J-S: 完成率 lesson_vals = [round(s["lesson_rates"][name] / 100, 3) for name in U0_COL_ORDER] lark_write(SHEET_STAT, f"{SHEET_STAT}!J{row_num}:S{row_num}", [lesson_vals]) print(f" ✓ {sales_name} {month}: 总析出={s['total']} 匹配={s['matched']} 转化={s['converted_all']} " f"转化率={s['conv_rate']:.1f}% 退费率={s['refund_rate']:.1f}% GMV={s['gmv']:.2f} GSV={s['gsv']:.2f}") # ── 主流程 ────────────────────────────────────────────── def main(): dry_run = "--dry-run" in sys.argv if dry_run: print("⚠️ DRY-RUN 模式,不会实际写入\n") # 处理吴迪 sheet wd_data = process_sheet(SHEET_WD, "吴迪", dry_run) # 汇总统计 print(f"\n{'='*60}") print("汇总统计 → 统计 sheet") print(f"{'='*60}") wd_stats = compute_stats(wd_data) all_stats = {"吴迪": wd_stats} if dry_run: for sales_name, stats in all_stats.items(): for month, s in stats.items(): print(f" [DRY-RUN] {sales_name} {month}: 总析出={s['total']} 匹配={s['matched']} 转化={s['converted_all']} 转化率={s['conv_rate']:.1f}% 退费率={s['refund_rate']:.1f}% GMV={s['gmv']:.2f} GSV={s['gsv']:.2f}") else: write_all_stats(all_stats, dry_run) print("\n✅ 处理完成") if __name__ == "__main__": main()