#!/usr/bin/env python3 """ 销售线索自动回填 — 从飞书表格读取销售填写的手机号,自动匹配并回填用户信息 执行频率:每小时 cron 巡检 归属 Agent:小溪 (xiaoxi) 表格列结构 (A-V, 共22列): Row 1: 表头 Row 2: 标注行 (👤手填 / 🤖自动) Row 3+: 数据行 A: 销售归属 (销售填) B: 微信昵称 (销售填) C: 进线日期 (销售填) D: 体验节数 (自动回填) E: 手机号 (销售填) F: 用户年级 (销售填) G: 课史/跟进 (销售填) H: 用户ID (自动回填) I: 注册日期 (自动回填) J: 下载渠道 (自动回填) K: 是否下单 (自动回填) L: 下单日期 (自动回填) M: 成交渠道 (自动回填) N: 产品 (自动回填) O: 下单金额/GMV (自动回填) P: 退款金额 (自动回填) Q: 实际收入/GSV (自动回填) R: 激活课程 (自动回填) S: 当前行课进度 (自动回填) T: 最近行课时间 (自动回填) U: 累计学习时长/min (自动回填) V: 更新时间 (自动回填) """ import json, requests, os, re, sys, time, psycopg2 from datetime import datetime from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone # ── 配置 ── PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = 28591 PG_USER = "ai_member" PG_DB = "vala_bi" SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" SHEET_IDS = { "吴迪": "f975f0", "小龙": "qJF4I", "成都": "qJF4J", } CRED_DIR = "/root/.openclaw/credentials/xiaoxi" LOG_FILE = "/var/log/xiaoxi_sales_lead.log" # 产品名称映射 GOODS_NAME_MAP = { 57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季", 31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包", 61: "瓦拉英语level1+2", } def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_pg_password(): secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env") with open(secrets_path) as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取Sheet失败: {data}") return data["data"]["valueRange"]["values"] def put_values(token, sheet_id, range_str, values, retries=3): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} for attempt in range(retries): resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) result = resp.json() code = result.get("code", -1) if code == 0: return result if code == 90217: # rate limited wait = 2 ** attempt log(f" 限流 {sheet_id}!{range_str}, 等待{wait}s重试...") time.sleep(wait) continue log(f" 写入失败 {sheet_id}!{range_str}: {result}") return result return result def encrypt_phone_local(phone): phone = str(phone).strip() if "." in phone: parts = phone.split(".") if parts[1] in ("0", "00"): phone = parts[0] if re.match(r"^1\d{10}$", phone): return encrypt_phone(phone) return None def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i + chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results def process_sheet(token, cur, sheet_name, sheet_id): """处理单个销售分表""" log(f"\n--- [{sheet_name}] {sheet_id} ---") rows = read_sheet(token, sheet_id) if len(rows) <= 2: log(f" [{sheet_name}] 无数据行(仅表头+标注),跳过") return {"processed": 0, "matched": 0} log(f" [{sheet_name}] 读取到 {len(rows) - 2} 行数据") now = datetime.now() # 找出需要处理的行(从第3行开始,跳过表头和标注行) # 条件:E列(手机号)有值 且 任一自动列(D,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V)为空 或 V列超过1小时 auto_cols = [3, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21] # D,H~V 的0-based索引 pending = [] for idx, row in enumerate(rows[2:], start=3): if len(row) < 5: continue phone = str(row[4]).strip() if len(row) > 4 and row[4] else "" if not phone: continue update_time = str(row[21]).strip() if len(row) > 21 and row[21] else "" need_refresh = False # 检查任一自动列是否为空 for ci in auto_cols: val = str(row[ci]).strip() if len(row) > ci and row[ci] else "" if not val: need_refresh = True break # 或超过1小时刷新 if not need_refresh and update_time: try: last_update = datetime.strptime(update_time, "%Y-%m-%d %H:%M:%S") if (now - last_update).total_seconds() > 3600: need_refresh = True except ValueError: need_refresh = True if need_refresh: pending.append({"row_idx": idx, "phone": phone}) log(f" [{sheet_name}] 待处理: {len(pending)} 行") if not pending: return {"processed": 0, "matched": 0} # 手机号→account_id 匹配(XXTEA加密匹配 tel_encrypt) phones_raw = list(set(r["phone"] for r in pending)) valid_phones = [(p, encrypt_phone_local(p)) for p in phones_raw if encrypt_phone_local(p)] enc_list = list(set(m[1] for m in valid_phones)) enc_to_aid = {} for i in range(0, len(enc_list), 500): batch = enc_list[i:i + 500] ph = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL", batch ) for aid, tel_enc in cur.fetchall(): if tel_enc not in enc_to_aid: enc_to_aid[tel_enc] = aid phone_to_aid = {} for phone, enc in valid_phones: if enc in enc_to_aid: phone_to_aid[phone] = enc_to_aid[enc] log(f" [{sheet_name}] 手机号匹配: {len(phone_to_aid)}/{len(valid_phones)}") matched_aids = list(set(phone_to_aid.values())) now_str = now.strftime("%Y-%m-%d %H:%M:%S") if not matched_aids: for r in pending: row_num = r["row_idx"] put_values(token, sheet_id, f"H{row_num}:H{row_num}", [["未注册"]]) put_values(token, sheet_id, f"V{row_num}:V{row_num}", [[now_str]]) log(f" [{sheet_name}] 全部未匹配,已标记 {len(pending)} 行") return {"processed": len(pending), "matched": 0} # 批量查询用户信息 # 账号信息 cur.execute( f"SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN ({','.join(['%s']*len(matched_aids))}) AND status=1 AND deleted_at IS NULL", matched_aids ) account_info = {} for aid, created_at, channel in cur.fetchall(): account_info[aid] = { "created_at": created_at.strftime("%Y-%m-%d") if created_at else "", "download_channel": channel or "" } # 订单信息 cur.execute( f"SELECT account_id, pay_success_date, key_from, goods_id, pay_amount_int, order_status FROM bi_vala_order WHERE account_id IN ({','.join(['%s']*len(matched_aids))}) AND pay_success_date IS NOT NULL AND order_status IN (3,4) AND deleted_at IS NULL ORDER BY pay_success_date DESC", matched_aids ) order_info = defaultdict(list) for aid, pay_date, key_from, goods_id, amount, order_status in cur.fetchall(): order_info[aid].append({ "pay_date": pay_date.strftime("%Y-%m-%d") if pay_date else "", "key_from": key_from or "", "goods_id": goods_id, "amount": amount or 0, "order_status": order_status, }) # 退款信息 cur.execute( f"SELECT o.account_id, SUM(r.refund_amount::numeric) FROM bi_refund_order r INNER JOIN bi_vala_order o ON r.trade_no=o.trade_no WHERE o.account_id IN ({','.join(['%s']*len(matched_aids))}) AND r.status=3 AND o.order_status=4 AND r.deleted_at IS NULL GROUP BY o.account_id", matched_aids ) refund_info = {r[0]: r[1] for r in cur.fetchall()} # 激活课程 cur.execute( f"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket WHERE account_id IN ({','.join(['%s']*len(matched_aids))}) AND status=1 AND deleted_at IS NULL AND season_package_level IN ('A1','A2')", matched_aids ) activation = {} for aid, lvl in cur.fetchall(): if aid not in activation: activation[aid] = lvl elif activation[aid] != lvl: activation[aid] = "A1+A2" # 角色 → 行课记录 cur.execute( f"SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN ({','.join(['%s']*len(matched_aids))}) AND nickname IS NOT NULL AND nickname != '' AND deleted_at IS NULL", matched_aids ) account_chars = defaultdict(list) char_to_account = {} for aid, cid in cur.fetchall(): account_chars[aid].append(cid) char_to_account[cid] = aid char_ids = list(char_to_account.keys()) # 课程结构映射 cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson") chapter_map = {} for ch_id, cl, cs, cu, cl2 in cur.fetchall(): chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "") # 课时完成记录 char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "total_ms": 0}) for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)", (char_ids,) ) for uid, ch_id, created_at in cur.fetchall(): ch_data = chapter_map.get(ch_id) if ch_data: rec = char_plays[uid] if rec["latest_time"] is None or created_at > rec["latest_time"]: rec["latest_time"] = created_at rec["latest_chapter"] = (ch_id, ch_data) except Exception as e: log(f" 警告 {table}: {e}") # 学习总耗时 for tbl_idx in range(8): table = f"bi_user_component_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id", (char_ids,) ) for uid, total_ms in cur.fetchall(): if uid in char_plays: char_plays[uid]["total_ms"] += (total_ms or 0) except Exception as e: log(f" 警告 {table}: {e}") # 体验课完成节数 cur.execute( f"SELECT a.id, COUNT(*) FROM bi_vala_app_account a INNER JOIN bi_vala_app_character c ON a.id=c.account_id AND c.deleted_at IS NULL INNER JOIN bi_user_course_detail ucd ON c.id=ucd.user_id AND ucd.deleted_at IS NULL WHERE a.id IN ({','.join(['%s']*len(matched_aids))}) AND a.status=1 AND a.deleted_at IS NULL AND ucd.expire_time IS NULL GROUP BY a.id", matched_aids ) trial_count = {r[0]: r[1] for r in cur.fetchall()} # 组装回填数据 results = [] for r in pending: phone = r["phone"] aid = phone_to_aid.get(phone) row_num = r["row_idx"] if not aid: results.append({ "row": row_num, "values": { "H": "未注册", "I": "", "J": "", "K": "", "L": "", "M": "", "N": "", "O": "", "P": "", "Q": "", "R": "", "S": "", "T": "", "U": "", "V": now_str } }) continue info = account_info.get(aid, {}) orders = order_info.get(aid, []) refund_amt = refund_info.get(aid, 0) act = activation.get(aid, "") trials = trial_count.get(aid, 0) reg_date = info.get("created_at", "") download_ch = info.get("download_channel", "") has_order = "是" if orders else "否" if orders: latest_order = orders[0] order_date = latest_order["pay_date"] channel = latest_order["key_from"] goods_id = latest_order["goods_id"] product = GOODS_NAME_MAP.get(goods_id, f"goods_{goods_id}") gmv = sum(int(o["amount"]) for o in orders) / 100.0 else: order_date = "" channel = "" product = "" gmv = 0 gsv = gmv - float(refund_amt) / 100.0 act_label = "" if act == "A1": act_label = "L1" elif act == "A2": act_label = "L2" elif act == "A1+A2": act_label = "L1+L2" chars = account_chars.get(aid, []) best_time = None best_ch = None total_ms = 0 for cid in chars: play = char_plays.get(cid) if play and play["latest_chapter"]: if best_time is None or play["latest_time"] > best_time: best_time = play["latest_time"] best_ch = play["latest_chapter"] total_ms += play["total_ms"] if best_ch: ch_id, (cl, cs, cu, cl2) = best_ch progress = f"{cl}-{cs}-{cu}-{cl2}" recent = best_time.strftime("%Y-%m-%d") total_min = round(total_ms / 60000, 1) if total_min == int(total_min): total_min = int(total_min) else: progress = "无记录" recent = "" total_min = 0 results.append({ "row": row_num, "values": { "D": str(trials) if trials else "0", "H": str(aid), "I": reg_date, "J": download_ch, "K": has_order, "L": order_date, "M": channel, "N": product, "O": str(round(gmv, 2)) if gmv else "0", "P": str(round(float(refund_amt) / 100.0, 2)) if refund_amt else "0", "Q": str(round(gsv, 2)), "R": act_label, "S": progress, "T": recent, "U": str(total_min), "V": now_str, } }) # 回填 — 整行写入 D~V(19列),E/F/G 留空不覆盖销售数据 results.sort(key=lambda x: x["row"]) log(f" [{sheet_name}] 准备回填 {len(results)} 行") # 按连续行分组 groups = [] cur_grp = [] for r in results: if not cur_grp or r["row"] == cur_grp[-1]["row"] + 1: cur_grp.append(r) else: groups.append(cur_grp) cur_grp = [r] if cur_grp: groups.append(cur_grp) for gi, g in enumerate(groups): sr, er = g[0]["row"], g[-1]["row"] # D列单独写(体验节数) d_vals = [[r["values"].get("D", "")] for r in g] put_values(token, sheet_id, f"D{sr}:D{er}", d_vals) # H~V 整块写(跳过E/F/G,不覆盖销售数据) h_vals = [] for r in g: v = r["values"] h_vals.append([ v.get("H", ""), # H: 用户ID v.get("I", ""), # I: 注册日期 v.get("J", ""), # J: 下载渠道 v.get("K", ""), # K: 是否下单 v.get("L", ""), # L: 下单日期 v.get("M", ""), # M: 成交渠道 v.get("N", ""), # N: 产品 v.get("O", ""), # O: 下单金额(GMV) v.get("P", ""), # P: 退款金额 v.get("Q", ""), # Q: 实际收入(GSV) v.get("R", ""), # R: 激活课程 v.get("S", ""), # S: 当前行课进度 v.get("T", ""), # T: 最近行课时间 v.get("U", ""), # U: 累计学习时长 v.get("V", ""), # V: 更新时间 ]) put_values(token, sheet_id, f"H{sr}:V{er}", h_vals) if gi % 5 == 4: time.sleep(0.5) matched_count = sum(1 for r in results if r["values"]["H"] != "未注册") log(f" [{sheet_name}] 回填完成: {len(results)} 行, 匹配 {matched_count}, 未注册 {len(results) - matched_count}") return {"processed": len(results), "matched": matched_count} def main(): log("=" * 50) log("销售线索自动回填 启动") try: token = get_fs_token() conn = psycopg2.connect( host=PG_HOST, port=PG_PORT, user=PG_USER, password=get_pg_password(), dbname=PG_DB, connect_timeout=30 ) cur = conn.cursor() total_processed = 0 total_matched = 0 for sheet_name, sheet_id in SHEET_IDS.items(): r = process_sheet(token, cur, sheet_name, sheet_id) total_processed += r["processed"] total_matched += r["matched"] cur.close() conn.close() log(f"\n全部完成: 处理 {total_processed} 行, 匹配 {total_matched}, 未注册 {total_processed - total_matched}") return 0 except Exception as e: log(f"ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())