ai_member_xiaoxi/scripts/beijing_pilot_leads_refresh.py
2026-06-19 08:00:01 +08:00

504 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
北京试点线索刷新 — 从北京工作簿 LP 表 + DB 重建 4koH9C
数据源:
- LP 慧萌 kkzS48 / LP 虹茗 62366f (北京工作簿 FPYMsatUPhCpy5trDKdcfNH2nvM)
- 成单 qX7oJ6 (北京工作簿,销售「萌」= 慧萌)
- DB (vala_bi): 注册信息 + 订单 + 退款 + 行课
写入目标: 4koH9C (A-Z, 同销售三表结构)
"""
import json, requests, os, sys, psycopg2, time
from datetime import datetime, timedelta
from collections import defaultdict
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
# ── 配置 ──
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = 28591
PG_USER = "ai_member"
PG_DB = "vala_bi"
TARGET_SPREADSHEET = "NoZqsFi47hIOHEt9j8WcfRtbnug"
TARGET_SHEET = "4koH9C"
BEIJING_SPREADSHEET = "FPYMsatUPhCpy5trDKdcfNH2nvM"
LP_SHEETS = [
("kkzS48", "慧萌"),
("62366f", "虹茗"),
]
CHENGDAN_SHEET = "qX7oJ6"
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
LOG_FILE = "/var/log/xiaoxi_beijing_leads_refresh.log"
GOODS_NAMES = {
57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季",
31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包",
61: "瓦拉英语level1+2",
}
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_pg_password():
with open(os.path.join(SCRIPTS_DIR, "..", "secrets.env")) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip("'\"")
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_sheet(token, spreadsheet, sheet_id, range_str=None):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{spreadsheet}/values/{sheet_id}"
if range_str:
url += f"!{range_str}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"读取失败 {sheet_id}: {data}")
return data["data"]["valueRange"]["values"]
def put_values(token, sheet_id, range_str, values):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{TARGET_SPREADSHEET}/values"
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
resp = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
r = resp.json()
if r.get("code") != 0:
log(f"{range_str}: {r.get('code')} {r.get('msg')}")
return False
return True
def excel_serial_to_date(serial):
"""Excel serial number → YYYY-MM-DD"""
if not serial:
return None
try:
s = int(float(serial))
return (datetime(1899, 12, 30) + timedelta(days=s)).strftime("%Y-%m-%d")
except:
return None
def excel_serial_to_md(serial):
"""Excel serial → M月D日 格式"""
d = excel_serial_to_date(serial)
if not d:
return ""
dt = datetime.strptime(d, "%Y-%m-%d")
return f"{dt.month}{dt.day}"
def classify_sales_channel(key_from):
if not key_from:
return "直购"
kf = key_from.strip()
if kf in ('app-active-h5-0-0', 'app-sales-bj-qhm-0', 'app-sales-bj-wd-0'):
return "端内"
if kf.startswith('sales-adp-'):
return "销转"
if kf.startswith('newmedia-daren-') or kf == 'newmedia-dianpu-wwxx-0-0':
return "达人"
return "直购"
def batch_in(cur, sql_tpl, params, chunk=500):
results = []
for i in range(0, len(params), chunk):
batch = params[i:i+chunk]
if not batch:
break
ph = ",".join(["%s"] * len(batch))
cur.execute(sql_tpl % ph, batch)
results.extend(cur.fetchall())
return results
# ── Step 1: 解析 LP 表 ──
def parse_lp_sheets(token):
"""返回 [{sales, nickname, date_str, exp_lessons, phone, grade, followup, lp_uid}, ...]"""
all_leads = []
for sid, sales_name in LP_SHEETS:
rows = read_sheet(token, BEIJING_SPREADSHEET, sid)
log(f" {sales_name} ({sid}): {len(rows)} rows total")
for idx, row in enumerate(rows[2:], start=3):
if not row or len(row) < 6:
continue
# A: 是否下单 (col 0)
# B: 序列号 (col 1)
# C: 进线索日期 (col 2) - Excel serial
date_serial = row[2] if len(row) > 2 else None
date_str = excel_serial_to_md(date_serial) if date_serial else ""
# E: 微信昵称 (col 4)
nickname = str(row[4]).strip() if len(row) > 4 and row[4] else ""
# F: 手机号 (col 5)
phone = ""
if len(row) > 5 and row[5]:
try:
phone = str(int(float(row[5])))
except:
phone = str(row[5]).strip()
# G: 孩子年龄/年级 (col 6)
grade = str(row[6]).strip() if len(row) > 6 and row[6] else ""
# H: 英语基础和在学课程 (col 7) → 课史/跟进
followup = str(row[7]).strip() if len(row) > 7 and row[7] else ""
# N: 用户ID (col 13) - LP 自带的
lp_uid = ""
if len(row) > 13 and row[13]:
try:
lp_uid = str(int(float(row[13])))
except:
pass
# T: U0行课进度 (col 18) → 体验节数
u0_progress = str(row[18]).strip() if len(row) > 18 and row[18] else ""
exp_lessons = u0_progress if u0_progress else ""
if not phone or len(phone) != 11:
continue # skip rows without valid phone
all_leads.append({
"sales": sales_name,
"nickname": nickname,
"date_str": date_str,
"date_serial": date_serial,
"exp_lessons": exp_lessons,
"phone": phone,
"grade": grade,
"followup": followup,
"lp_uid": lp_uid,
})
log(f" {sales_name}: {len([l for l in all_leads if l['sales']==sales_name])} valid leads")
log(f"{len(all_leads)} 条有效线索")
return all_leads
# ── Step 2: DB 批量查询 ──
def query_db(conn, leads):
"""查询注册信息、订单、退款、行课"""
cur = conn.cursor()
# 加密所有手机号
phone_enc_map = {}
for lead in leads:
enc = encrypt_phone(lead["phone"])
phone_enc_map[enc] = lead["phone"]
lead["tel_encrypt"] = enc
enc_list = list(phone_enc_map.keys())
# 2a. 注册信息
log(" 查询注册信息...")
reg_info = batch_in(cur,
"SELECT id, tel_encrypt, created_at, download_channel FROM bi_vala_app_account WHERE tel_encrypt IN (%s) AND status=1 AND deleted_at IS NULL",
enc_list
)
tel_to_account = {}
for aid, tel_enc, created_at, dc in reg_info:
tel_to_account[tel_enc] = {
"account_id": aid,
"reg_date": created_at.strftime("%Y-%m-%d") if created_at else "",
"download_channel": dc or "",
}
# 填充到 leads
account_ids = set()
for lead in leads:
acc = tel_to_account.get(lead["tel_encrypt"], {})
lead["account_id"] = acc.get("account_id")
lead["reg_date"] = acc.get("reg_date", "")
lead["download_channel"] = acc.get("download_channel", "")
if lead["account_id"]:
account_ids.add(lead["account_id"])
aid_list = list(account_ids)
log(f" 匹配到 account_id: {len(aid_list)}")
# 2b. 订单信息
log(" 查询订单信息...")
lead["orders"] = []
lead["valid_order"] = None
if aid_list:
orders = batch_in(cur,
"SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) ORDER BY pay_success_date DESC",
aid_list
)
# 按 account_id 分组
aid_orders = defaultdict(list)
for o in orders:
aid_orders[o[0]].append(o)
# 退款
trade_nos = [o[1] for o in orders if o[1]]
refund_map = defaultdict(int)
if trade_nos:
refunds = batch_in(cur,
"SELECT trade_no, refund_amount_int FROM bi_refund_order WHERE trade_no IN (%s) AND status=3",
trade_nos
)
for tn, amt in refunds:
refund_map[tn] += amt # SUM 多笔退费
# 为每个 account 找有效订单
for lead in leads:
aid = lead.get("account_id")
if not aid:
continue
olist = aid_orders.get(aid, [])
lead["all_orders"] = olist
# 找有效订单: GSV>0, 非全额退, 下单日期≥进线日期
lead_date = excel_serial_to_date(lead.get("date_serial"))
for o in olist:
trade_no = o[1]
pay_date = o[2]
key_from = o[3]
goods_id = o[4]
gmv = o[5]
order_status = o[6]
total_refund = refund_map.get(trade_no, 0)
gsv = gmv - total_refund
if gsv <= 0:
continue # 全额退或 GSV≤0
if gmv == total_refund:
continue # 全额退
pay_date_str = pay_date.strftime("%Y-%m-%d") if pay_date else ""
if lead_date and pay_date_str < lead_date:
continue # 下单早于进线
lead["valid_order"] = {
"trade_no": trade_no,
"pay_date": pay_date_str,
"pay_date_md": f"{pay_date.month}{pay_date.day}" if pay_date else "",
"key_from": key_from or "",
"goods_id": goods_id,
"gmv": gmv / 100.0,
"refund": total_refund / 100.0,
"gsv": gsv / 100.0,
"product": GOODS_NAMES.get(goods_id, f"商品{goods_id}"),
"channel_class": classify_sales_channel(key_from),
}
break # 取第一个符合条件的(最新)
# 2c. 激活课程
log(" 查询激活课程...")
if aid_list:
try:
activations = batch_in(cur,
"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket WHERE account_id IN (%s) AND status=1 AND deleted_at IS NULL AND season_package_level IN ('A1','A2')",
aid_list
)
aid_activation = {}
for aid, lvl in activations:
aid_activation[aid] = lvl
for lead in leads:
aid = lead.get("account_id")
if aid:
lead["activation"] = aid_activation.get(aid, "")
else:
lead["activation"] = ""
except Exception as e:
log(f" 激活查询异常: {e}")
for lead in leads:
lead["activation"] = ""
# 2d. 角色 + 行课
log(" 查询角色信息...")
lead["lesson_progress"] = ""
lead["lesson_time"] = ""
lead["lesson_minutes"] = 0
if aid_list:
char_info = batch_in(cur,
"SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN (%s) AND deleted_at IS NULL",
aid_list
)
account_chars = defaultdict(list)
char_to_account = {}
for aid, cid in char_info:
account_chars[aid].append(cid)
char_to_account[cid] = aid
char_ids = list(char_to_account.keys())
log(f" 角色数: {len(char_ids)}")
# 课程映射
cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson")
chapter_map = {}
for ch_id, cl, cs, cu, cl2 in cur.fetchall():
chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "")
# 课时完成记录
log(" 查询课时完成记录...")
char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None})
for tbl_idx in range(8):
table = f"bi_user_chapter_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)",
(char_ids,)
)
for uid, ch_id, created_at in cur.fetchall():
ch_data = chapter_map.get(ch_id)
if not ch_data:
continue
rec = char_plays[uid]
if rec["latest_time"] is None or created_at > rec["latest_time"]:
rec["latest_time"] = created_at
rec["latest_chapter"] = ch_data
except Exception as e:
log(f" 警告 {table}: {e}")
# 学习总耗时
log(" 查询学习耗时...")
for tbl_idx in range(8):
table = f"bi_user_component_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id",
(char_ids,)
)
for uid, total_ms in cur.fetchall():
if uid in char_plays:
char_plays[uid]["total_ms"] = char_plays[uid].get("total_ms", 0) + (total_ms or 0)
except Exception as e:
log(f" 警告 {table}: {e}")
# 汇总到 account 级别
for lead in leads:
aid = lead.get("account_id")
if not aid:
continue
chars = account_chars.get(aid, [])
best_time = None
best_ch = None
total_ms = 0
for cid in chars:
play = char_plays.get(cid)
if not play:
continue
if play.get("latest_chapter"):
if best_time is None or play["latest_time"] > best_time:
best_time = play["latest_time"]
best_ch = play["latest_chapter"]
total_ms += play.get("total_ms", 0)
if best_ch:
cl, cs, cu, cl2 = best_ch
lead["lesson_progress"] = f"{cl}-{cs}-{cu}-{cl2}"
if best_time:
lead["lesson_time"] = best_time.strftime("%Y-%m-%d")
lead["lesson_minutes"] = round(total_ms / 60000, 1) if total_ms > 0 else 0
cur.close()
log(" DB 查询完成")
return leads
# ── Step 3: 写入目标表 ──
def write_target_sheet(token, leads):
"""Clear A3:Z500, 写入所有线索行"""
log(" 写入 4koH9C...")
# 先清空 A1 的迁移提示
put_values(token, TARGET_SHEET, "A1:A1", [[""]])
# 恢复标准表头 r1
header = [["销售归属", "微信昵称", "进线日期", "体验节数", "手机号", "用户年级",
"课史/跟进", "用户ID", "注册日期", "下载渠道", "下单日期", "成交渠道",
"产品", "下单金额(GMV)", "退款金额", "实际收入(GSV)", "激活课程",
"当前行课进度", "最近行课时间", "累计学习时长(min)", "更新时间",
"微伴补充", "进线早于下单", "订单号", "有效订单", "渠道归属"]]
put_values(token, TARGET_SHEET, "A1:Z1", header)
# Clear r2
put_values(token, TARGET_SHEET, "A2:Z2", [[""] * 26])
# 构建数据行
update_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
rows = []
for lead in leads:
vo = lead.get("valid_order")
row = [
lead["sales"], # A: 销售归属
lead["nickname"], # B: 微信昵称
lead["date_str"], # C: 进线日期
lead["exp_lessons"], # D: 体验节数
lead["phone"], # E: 手机号
lead["grade"], # F: 用户年级
lead["followup"], # G: 课史/跟进
lead.get("account_id", "") or "", # H: 用户ID
lead.get("reg_date", ""), # I: 注册日期
lead.get("download_channel", ""), # J: 下载渠道
vo["pay_date_md"] if vo else "", # K: 下单日期
vo["key_from"] if vo else "", # L: 成交渠道
vo["product"] if vo else "", # M: 产品
vo["gmv"] if vo else "", # N: GMV
vo["refund"] if vo else "", # O: 退款金额
vo["gsv"] if vo else "", # P: GSV
lead.get("activation", ""), # Q: 激活课程
lead.get("lesson_progress", ""), # R: 当前行课进度
lead.get("lesson_time", ""), # S: 最近行课时间
lead.get("lesson_minutes", 0) or "", # T: 累计学习时长
update_time, # U: 更新时间
"", # V: 微伴补充 (不填)
"", # W: 进线早于下单 (Cursor 填)
vo["trade_no"] if vo else "", # X: 订单号
1 if vo else 0, # Y: 有效订单
vo["channel_class"] if vo else "", # Z: 渠道归属
]
rows.append(row)
total = len(rows)
log(f"{total}Y=1: {sum(1 for r in rows if r[24]==1)}")
# 分批写入 (每批最多 20 行 × 26 列 = 520 格,远低于 4400)
for batch_start in range(0, total, 20):
batch = rows[batch_start:batch_start+20]
sr = 3 + batch_start
er = sr + len(batch) - 1
put_values(token, TARGET_SHEET, f"A{sr}:Z{er}", batch)
time.sleep(0.3)
# 清除多余旧行
if total < 498:
clear_start = 3 + total
clear_end = 500
empty_rows = [[""] * 26 for _ in range(clear_end - clear_start + 1)]
put_values(token, TARGET_SHEET, f"A{clear_start}:Z{clear_end}", empty_rows)
log(f" 清除多余行 A{clear_start}:Z{clear_end}")
log(f" 写入完成")
# ── Main ──
def main():
log("=" * 50)
log("北京试点线索刷新 启动")
try:
token = get_fs_token()
conn = psycopg2.connect(
host=PG_HOST, port=PG_PORT, user=PG_USER,
password=get_pg_password(), dbname=PG_DB, connect_timeout=30
)
# Step 1: 解析 LP 表
log("Step 1: 解析 LP 表")
leads = parse_lp_sheets(token)
# Step 2: DB 查询
log("Step 2: DB 查询")