ai_member_xiaoxi/scripts/fill_leads_sheet.py
2026-06-16 08:00:01 +08:00

618 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
端内析出leads数据 — 自动回填 + 统计汇总脚本
流程:
1. 读取吴迪 sheet 中 B 列的手机号
2. XXTEA 加密 → 匹配 bi_vala_app_account.tel_encrypt → 获取 account_id
3. 查询注册日期、转化全部key_from购课时间≥析出时间、转化keyfrom、退费、U0体验课完成日期 → 回写
4. 按析出月份汇总统计 → 写入"统计" sheet
统计口径:
- 转化率 = 已转化leads / 总leads
- 退费率 = 退费leads / 已转化leads
- 完成率 = 完成该课的leads / 总leads
用法:
python3 fill_leads_sheet.py [--dry-run]
"""
import sys
import os
import json
import subprocess
import re
from collections import defaultdict
# ── 配置 ──────────────────────────────────────────────
SPREADSHEET_TOKEN = "FA3xsw3kph4pdatKlUrcyPgInAc"
SHEET_WD = "1K3O6s" # 吴迪
SHEET_STAT = "scyF3H" # 统计
# U0 体验课 chapter_id
U0_CHAPTERS = {
"L1-U0-L01": 343,
"L1-U0-L02": 344,
"L1-U0-L03": 345,
"L1-U0-L04": 346,
"L1-U0-L05": 348,
"L2-U0-L01": 55,
"L2-U0-L02": 56,
"L2-U0-L03": 57,
"L2-U0-L04": 58,
"L2-U0-L05": 59,
}
# U0 列顺序(与表头一致)
U0_COL_ORDER = [
"L1-U0-L01", "L1-U0-L02", "L1-U0-L03", "L1-U0-L04", "L1-U0-L05",
"L2-U0-L01", "L2-U0-L02", "L2-U0-L03", "L2-U0-L04", "L2-U0-L05",
]
# 列映射0-based
# 注意A列「序号」和C列「微信昵称」由销售手动填写脚本不读写
COL_SEQ = 0 # A: 序号(手动填,脚本跳过)
COL_PHONE = 1 # B: 用户手机号
COL_NICKNAME = 2 # C: 微信昵称(销售手动填写,脚本跳过)
COL_USER_ID = 3 # D: 用户ID
COL_EXTRACT_DATE = 4 # E: 析出日期(手动填)
COL_REG_DATE = 5 # F: 注册日期
COL_CONVERTED = 6 # G: 是否转化
COL_CONVERT_DATE = 7 # H: 转化日期
COL_CONVERT_KEYFROM = 8 # I: 转化keyfrom
COL_CONVERT_GSV = 9 # J: 转化金额(GSV)(新增)
COL_REFUND = 10 # K: 是否退费
COL_REFUND_DATE = 11 # L: 退费日期
COL_U0_START = 12 # M-W: L1-U0-L01 ~ L2-U0-L05
# ── 数据库 ─────────────────────────────────────────────
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = "28591"
PG_USER = "ai_member"
PG_DB = "vala_bi"
PG_PASSWORD = "LdfjdjL83h3h3^$&**YGG*"
# ── 加密 ───────────────────────────────────────────────
import xxtea
import base64
XXTEA_KEY = "K1pNOZ5O5+ZqTPSHA2kzPdoNOMOGcv6g"
def excel_serial_to_date(serial: int) -> str:
"""Excel 序列日期 → YYYY-MM-DD 字符串"""
from datetime import datetime, timedelta
if serial >= 61:
dt = datetime(1899, 12, 30) + timedelta(days=serial)
else:
dt = datetime(1899, 12, 31) + timedelta(days=serial)
return dt.strftime("%Y-%m-%d")
def encrypt_phone(phone: str) -> str:
encrypted = xxtea.encrypt(phone.encode(), XXTEA_KEY.encode())
result = base64.b64encode(encrypted).decode()
result = result.replace("+", "-").replace("/", "_").replace("=", ".")
return result
def pg_query(sql: str) -> list[list]:
env = os.environ.copy()
env["PGPASSWORD"] = PG_PASSWORD
cmd = [
"psql", "-h", PG_HOST, "-p", PG_PORT, "-U", PG_USER, "-d", PG_DB,
"-t", "-A", "-F", "\t", "-c", sql,
]
result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=60)
if result.returncode != 0:
print(f"[ERROR] PG query failed: {result.stderr}", file=sys.stderr)
return []
lines = result.stdout.strip().split("\n")
rows = []
for line in lines:
if not line.strip():
continue
rows.append(line.split("\t"))
return rows
def lark_read(sheet_id: str, range_str: str) -> list:
result = subprocess.run(
["lark-cli", "sheets", "+read", "--as", "bot",
"--spreadsheet-token", SPREADSHEET_TOKEN,
"--sheet-id", sheet_id, "--range", range_str],
capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
print(f"[ERROR] lark read failed: {result.stderr}", file=sys.stderr)
return []
data = json.loads(result.stdout)
if not data.get("ok"):
print(f"[ERROR] lark read error: {data}", file=sys.stderr)
return []
return data["data"]["valueRange"]["values"]
def lark_write(sheet_id: str, range_str: str, values: list) -> bool:
payload = json.dumps(values)
result = subprocess.run(
["lark-cli", "sheets", "+write", "--as", "bot",
"--spreadsheet-token", SPREADSHEET_TOKEN,
"--sheet-id", sheet_id, "--range", range_str,
"--values", payload],
capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
print(f"[ERROR] lark write failed: {result.stderr}", file=sys.stderr)
return False
data = json.loads(result.stdout)
return data.get("ok", False)
def match_phones(phones: list[str]) -> dict[str, dict]:
if not phones:
return {}
encrypted_map = {encrypt_phone(p): p for p in phones}
enc_list = list(encrypted_map.keys())
BATCH_SIZE = 50
results = {}
for i in range(0, len(enc_list), BATCH_SIZE):
batch = enc_list[i:i + BATCH_SIZE]
quoted = ",".join(f"'{e}'" for e in batch)
sql = f"""
SELECT id, name, tel_encrypt, created_at::date::text
FROM bi_vala_app_account
WHERE tel_encrypt IN ({quoted})
AND status = 1 AND deleted_at IS NULL
"""
for row in pg_query(sql):
if len(row) >= 4:
acc_id, name, tel_enc, created_at = row[0], row[1], row[2], row[3]
plain = encrypted_map.get(tel_enc)
if plain:
results[plain] = {"id": acc_id, "name": name, "created_at": created_at}
return results
def query_orders_and_refunds(account_ids: list[str]) -> dict[str, dict]:
"""
查询全部渠道订单及退费信息(不限 key_from
返回: {account_id: {"orders": [(pay_date, key_from, is_refunded), ...], "total_gmv_cents": int, "total_gsv_cents": int}}
orders 按 pay_success_date 升序排列。
"""
if not account_ids:
return {}
BATCH_SIZE = 100
results = {}
for i in range(0, len(account_ids), BATCH_SIZE):
batch = account_ids[i:i + BATCH_SIZE]
ids_str = ",".join(batch)
# 查询每笔订单及退费状态
sql = f"""
SELECT o.account_id::text,
o.pay_success_date::date::text,
o.key_from,
o.pay_amount_int,
COALESCE(SUM(CASE WHEN r.status = 3 AND o2.order_status = 4 THEN r.refund_amount::numeric * 100 ELSE 0 END), 0)::bigint AS total_refund_fen,
BOOL_OR(r.id IS NOT NULL AND r.status = 3 AND o2.order_status = 4) AS is_refunded
FROM bi_vala_order o
LEFT JOIN bi_refund_order r ON o.trade_no = r.trade_no
LEFT JOIN bi_vala_order o2 ON o.trade_no = o2.trade_no AND o2.order_status = 4
WHERE o.account_id IN ({ids_str})
AND o.pay_success_date IS NOT NULL
AND o.order_status IN (3, 4)
GROUP BY o.account_id, o.pay_success_date, o.key_from, o.pay_amount_int
ORDER BY o.account_id, o.pay_success_date
"""
for row in pg_query(sql):
if len(row) >= 6:
acc_id, pay_date, key_from, pay_amount, total_refund_fen, is_refunded = row[0], row[1], row[2], row[3], row[4], row[5]
if acc_id not in results:
results[acc_id] = {"orders": [], "total_gmv_cents": 0, "total_gsv_cents": 0}
refunded = is_refunded in ("t", "true")
results[acc_id]["orders"].append((pay_date, key_from, refunded))
# 累加用户总 GMV和 GSV
results[acc_id]["total_gmv_cents"] += int(pay_amount)
gsv_fen = int(pay_amount) - int(total_refund_fen)
results[acc_id]["total_gsv_cents"] += gsv_fen
return results
def query_learning(account_ids: list[str]) -> dict[str, dict[str, str]]:
if not account_ids:
return {}
BATCH_SIZE = 100
all_chapter_dates = {}
for i in range(0, len(account_ids), BATCH_SIZE):
batch = account_ids[i:i + BATCH_SIZE]
ids_str = ",".join(batch)
char_sql = f"""
SELECT c.account_id::text, c.id::text
FROM bi_vala_app_character c
WHERE c.account_id IN ({ids_str}) AND c.deleted_at IS NULL
"""
char_rows = pg_query(char_sql)
char_map = {}
for row in char_rows:
acc_id, char_id = row[0], row[1]
char_map.setdefault(acc_id, []).append(char_id)
if not char_map:
continue
all_char_ids = []
for cids in char_map.values():
all_char_ids.extend(cids)
chapter_ids = list(U0_CHAPTERS.values())
chapter_str = ",".join(str(c) for c in chapter_ids)
for table_idx in range(8):
table_name = f"bi_user_chapter_play_record_{table_idx}"
char_batches = [all_char_ids[j:j + 200] for j in range(0, len(all_char_ids), 200)]
for char_batch in char_batches:
chars_str = ",".join(char_batch)
sql = f"""
SELECT user_id::text, chapter_id, MIN(created_at::date::text)
FROM {table_name}
WHERE user_id IN ({chars_str})
AND chapter_id IN ({chapter_str})
AND play_status = 1
GROUP BY user_id, chapter_id
"""
try:
rows = pg_query(sql)
except Exception:
continue
for row in rows:
if len(row) >= 3:
char_id, ch_id, comp_date = row[0], int(row[1]), row[2]
for acc_id, cids in char_map.items():
if char_id in cids:
all_chapter_dates.setdefault(acc_id, {})
for name, cid in U0_CHAPTERS.items():
if cid == ch_id:
all_chapter_dates[acc_id][name] = comp_date
break
break
return all_chapter_dates
# ── 处理单个销售 sheet ──────────────────────────────────
def process_sheet(sheet_id: str, sheet_name: str, dry_run: bool = False) -> list[dict]:
"""处理单个销售 sheet回填数据返回 lead 数据列表供统计使用"""
print(f"\n{'='*60}")
print(f"处理 Sheet: {sheet_name} ({sheet_id})")
print(f"{'='*60}")
range_str = f"{sheet_id}!A2:W"
try:
rows = lark_read(sheet_id, range_str)
except Exception as e:
print(f"[ERROR] 读取失败: {e}")
return []
if not rows:
print("没有数据行")
return []
print(f"读取到 {len(rows)} 行数据")
phone_to_row = {}
for idx, row in enumerate(rows):
if len(row) > COL_PHONE and row[COL_PHONE]:
phone = str(row[COL_PHONE]).strip()
if phone and re.match(r'^1\d{10}$', phone):
phone_to_row.setdefault(phone, []).append(idx)
if not phone_to_row:
print("没有有效的手机号")
return []
phones = list(phone_to_row.keys())
print(f"有效手机号: {len(phones)}")
print("→ 匹配 account_id...")
acc_info = match_phones(phones)
print(f" 匹配到 {len(acc_info)} 个账号")
matched_accounts = [info["id"] for info in acc_info.values()]
matched_phones = set(acc_info.keys())
print("→ 查询订单及退费信息(全部渠道)...")
order_info = query_orders_and_refunds(matched_accounts)
print("→ 查询 U0 学习进度...")
learn_info = query_learning(matched_accounts)
updates = []
lead_data = []
matched_row_indices = set()
for phone, row_indices in phone_to_row.items():
info = acc_info.get(phone)
if not info:
continue
acc_id = info["id"]
learn = learn_info.get(acc_id, {})
for row_idx in row_indices:
matched_row_indices.add(row_idx)
extract_date = ""
if len(rows[row_idx]) > COL_EXTRACT_DATE and rows[row_idx][COL_EXTRACT_DATE]:
extract_date = str(rows[row_idx][COL_EXTRACT_DATE]).strip()
# 转化判断:全部渠道,购课时间 ≥ 析出时间
# 退费判断:只要存在一笔 ≥ 析出日期且未退费的订单,就算未退费
# 转化金额:用户全部订单 GSV全部 pay_amount - 全部 refund_amount
orders = order_info.get(acc_id, {})
conv_date = ""
conv_keyfrom = ""
conv_gsv = ""
is_refunded = ""
# 析出日期可能是 Excel 序列数字,统一转为 YYYY-MM-DD
extract_date_ymd = ""
if extract_date:
if extract_date.isdigit():
extract_date_ymd = excel_serial_to_date(int(extract_date))
else:
extract_date_ymd = extract_date
if extract_date_ymd:
first_match = None
has_valid_order = False
for pay_date, kf, refunded in orders.get("orders", []):
if pay_date >= extract_date_ymd:
if first_match is None:
first_match = (pay_date, kf)
if not refunded:
has_valid_order = True
if first_match:
conv_date = first_match[0]
conv_keyfrom = first_match[1]
# 转化金额 = 用户总 GSV
total_gsv = orders.get("total_gsv_cents", 0)
conv_gsv = str(round(total_gsv / 100, 2))
is_refunded = "" if has_valid_order else ""
f_value = "" if conv_date else ""
updates.append((row_idx, COL_USER_ID, acc_id))
updates.append((row_idx, COL_REG_DATE, info.get("created_at", "")))
updates.append((row_idx, COL_CONVERTED, f_value))
updates.append((row_idx, COL_CONVERT_DATE, conv_date))
updates.append((row_idx, COL_CONVERT_KEYFROM, conv_keyfrom))
updates.append((row_idx, COL_CONVERT_GSV, conv_gsv))
updates.append((row_idx, COL_REFUND, is_refunded))
updates.append((row_idx, COL_REFUND_DATE, ""))
for col_offset, lesson_name in enumerate(U0_COL_ORDER):
updates.append((row_idx, COL_U0_START + col_offset, learn.get(lesson_name, "")))
lead_data.append({
"extract_date": extract_date,
"converted": f_value,
"refunded": is_refunded,
"gmv_cents": orders.get("total_gmv_cents", 0),
"gsv_cents": orders.get("total_gsv_cents", 0),
"lessons": {k: learn.get(k, "") for k in U0_COL_ORDER},
"has_phone": True,
})
# 未匹配手机号的行也纳入统计,转化状态取 F 列已有值
for row_idx, row in enumerate(rows):
if row_idx in matched_row_indices:
continue
extract_date = ""
if len(row) > COL_EXTRACT_DATE and row[COL_EXTRACT_DATE]:
extract_date = str(row[COL_EXTRACT_DATE]).strip()
# 读取 F 列已有值作为转化状态
existing_converted = ""
if len(row) > COL_CONVERTED and row[COL_CONVERTED]:
existing_converted = str(row[COL_CONVERTED]).strip()
lead_data.append({
"extract_date": extract_date,
"converted": existing_converted,
"refunded": "",
"gmv_cents": 0,
"gsv_cents": 0,
"lessons": {k: "" for k in U0_COL_ORDER},
"has_phone": False,
})
# 回写
row_updates = {}
for row_idx, col, val in updates:
row_updates.setdefault(row_idx, {})[col] = val
print(f"\n→ 准备回写 {len(row_updates)} 行数据...")
for row_idx, col_vals in sorted(row_updates.items()):
actual_row = row_idx + 2
# 分开写入D列单独写F-T列一起写跳过A列序号、C列微信昵称和E列析出日期由销售手动维护
# D: 用户ID
if COL_USER_ID in col_vals:
lark_write(sheet_id, f"{sheet_id}!D{actual_row}:D{actual_row}",
[[str(col_vals[COL_USER_ID])]])
# F-V: 注册日期 ~ L2-U0-L5含 I 列转化keyfrom
f_to_w = []
for col in range(COL_REG_DATE, COL_U0_START + len(U0_COL_ORDER)):
val = col_vals.get(col, "")
f_to_w.append(str(val) if val else "")
if dry_run:
print(f" [DRY-RUN] {sheet_id}!D{actual_row} + F{actual_row}:W{actual_row} ← ...")
else:
lark_write(sheet_id, f"{sheet_id}!F{actual_row}:W{actual_row}", [f_to_w])
print(f" ✓ 行 {actual_row} 回写成功")
unmatched = set(phones) - matched_phones
if unmatched:
print(f"\n⚠️ 未匹配到账号的手机号 ({len(unmatched)} 个):")
for p in sorted(unmatched):
print(f" {p}")
return lead_data
# ── 统计汇总 ────────────────────────────────────────────
def compute_stats(lead_data: list[dict]) -> dict[str, dict]:
"""
按析出月份汇总统计
口径:
- 转化率 = 已转化leads / 总leads
- 退费率 = 退费leads / 已转化leads
- 完成率 = 完成该课的leads / 总leads
"""
month_groups = defaultdict(list)
for lead in lead_data:
extract = lead.get("extract_date", "")
if not extract:
continue
# 支持三种格式: YYYY-MM-DD / YYYY/MM/DD / M月D日 / Excel序列数字
extract_str = str(extract).strip()
# 尝试 Excel 序列数字
if extract_str.isdigit():
month = excel_serial_to_date(int(extract_str))[:7]
else:
m = re.match(r'(\d{4})[-/](\d{1,2})', extract_str)
if m:
month = f"{m.group(1)}-{m.group(2).zfill(2)}"
else:
m = re.match(r'(\d{1,2})月\d{1,2}日', extract_str)
if m:
from datetime import datetime
year = datetime.now().year
month = f"{year}-{m.group(1).zfill(2)}"
else:
continue
month_groups[month].append(lead)
if not month_groups:
return {}
result = {}
for month, leads in sorted(month_groups.items()):
total = len(leads)
matched = sum(1 for l in leads if l.get("has_phone", False))
converted_all = sum(1 for l in leads if l["converted"] == "")
refunded = sum(1 for l in leads if l["refunded"] == "")
converted_unrefunded = sum(1 for l in leads if l["converted"] == "" and l["refunded"] != "")
conv_rate = converted_all / total * 100 if total > 0 else 0
refund_rate = refunded / converted_all * 100 if converted_all > 0 else 0
# GMV / GSV 汇总(元)
total_gmv = sum(l.get("gmv_cents", 0) for l in leads) / 100
total_gsv = sum(l.get("gsv_cents", 0) for l in leads) / 100
lesson_rates = {}
for lesson_name in U0_COL_ORDER:
completed = sum(1 for l in leads if l["lessons"].get(lesson_name, ""))
lesson_rates[lesson_name] = completed / total * 100 if total > 0 else 0
result[month] = {
"total": total,
"matched": matched,
"converted_all": converted_all,
"converted_unrefunded": converted_unrefunded,
"refunded": refunded,
"conv_rate": conv_rate,
"refund_rate": refund_rate,
"gmv": total_gmv,
"gsv": total_gsv,
"lesson_rates": lesson_rates,
}
return result
def write_all_stats(all_stats: dict[str, dict[str, dict]], dry_run: bool = False):
"""
将所有销售的统计数据写入统计 sheet
all_stats: {sales_name: {month: {conv_rate, refund_rate, ...}}}
按 销售+月份 逐行写入从第2行开始
"""
# 先写表头
header = ["销售", "月份", "总析出用户数", "匹配用户数", "转化用户数", "转化率", "退费率", "GMV", "GSV"] + \
[f"{name}完成率" for name in U0_COL_ORDER]
if not dry_run:
lark_write(SHEET_STAT, f"{SHEET_STAT}!A1:S1", [header])
# 构建有序行列表: [(sales_name, month, stats), ...]
rows_data = []
for sales_name in ["吴迪"]:
stats = all_stats.get(sales_name, {})
for month in sorted(stats.keys()):
rows_data.append((sales_name, month, stats[month]))
# 先清除统计 sheet 旧数据A2:S50避免残留旧行
print(" → 清除统计 sheet 旧数据...")
lark_write(SHEET_STAT, f"{SHEET_STAT}!A2:S50", [[""] * 19] * 49)
if not rows_data:
print(" 无统计数据")
return
for i, (sales_name, month, s) in enumerate(rows_data):
row_num = i + 2 # 从第2行开始
# A: 销售名
lark_write(SHEET_STAT, f"{SHEET_STAT}!A{row_num}:A{row_num}", [[sales_name]])
# B: 月份
lark_write(SHEET_STAT, f"{SHEET_STAT}!B{row_num}:B{row_num}", [[month]])
# C: 总析出用户数
lark_write(SHEET_STAT, f"{SHEET_STAT}!C{row_num}:C{row_num}", [[s["total"]]])
# D: 匹配用户数
lark_write(SHEET_STAT, f"{SHEET_STAT}!D{row_num}:D{row_num}", [[s["matched"]]])
# E: 转化用户数
lark_write(SHEET_STAT, f"{SHEET_STAT}!E{row_num}:E{row_num}", [[s["converted_all"]]])
# F: 转化率(小数,配合百分比格式显示)
lark_write(SHEET_STAT, f"{SHEET_STAT}!F{row_num}:F{row_num}", [[round(s["conv_rate"] / 100, 3)]])
# G: 退费率
lark_write(SHEET_STAT, f"{SHEET_STAT}!G{row_num}:G{row_num}", [[round(s["refund_rate"] / 100, 3)]])
# H: GMV
lark_write(SHEET_STAT, f"{SHEET_STAT}!H{row_num}:H{row_num}", [[round(s["gmv"], 2)]])
# I: GSV
lark_write(SHEET_STAT, f"{SHEET_STAT}!I{row_num}:I{row_num}", [[round(s["gsv"], 2)]])
# J-S: 完成率
lesson_vals = [round(s["lesson_rates"][name] / 100, 3) for name in U0_COL_ORDER]
lark_write(SHEET_STAT, f"{SHEET_STAT}!J{row_num}:S{row_num}", [lesson_vals])
print(f"{sales_name} {month}: 总析出={s['total']} 匹配={s['matched']} 转化={s['converted_all']} "
f"转化率={s['conv_rate']:.1f}% 退费率={s['refund_rate']:.1f}% GMV={s['gmv']:.2f} GSV={s['gsv']:.2f}")
# ── 主流程 ──────────────────────────────────────────────
def main():
dry_run = "--dry-run" in sys.argv
if dry_run:
print("⚠️ DRY-RUN 模式,不会实际写入\n")
# 处理吴迪 sheet
wd_data = process_sheet(SHEET_WD, "吴迪", dry_run)
# 汇总统计
print(f"\n{'='*60}")
print("汇总统计 → 统计 sheet")
print(f"{'='*60}")
wd_stats = compute_stats(wd_data)
all_stats = {"吴迪": wd_stats}
if dry_run:
for sales_name, stats in all_stats.items():
for month, s in stats.items():
print(f" [DRY-RUN] {sales_name} {month}: 总析出={s['total']} 匹配={s['matched']} 转化={s['converted_all']} 转化率={s['conv_rate']:.1f}% 退费率={s['refund_rate']:.1f}% GMV={s['gmv']:.2f} GSV={s['gsv']:.2f}")
else:
write_all_stats(all_stats, dry_run)
print("\n✅ 处理完成")
if __name__ == "__main__":
main()