#!/usr/bin/env python3 """ 销售线索全量刷新脚本 — XXTEA 精确匹配版 功能: 1. 读取「小龙」「吴迪」「成都」三个 sheet 的 E 列手机号 2. XXTEA 加密 → bi_vala_app_account.tel_encrypt 精确匹配 → 获取 account_id 3. 查询 PostgreSQL 获取用户订单/学习数据 4. 填写 D/H/I/J/K~U/X/Y/Z 列(自动列),U 列为操作更新时间 5. 将三个 sheet 中 Y=1(有效订单)的用户汇总到「订单汇总」sheet 规则(沿用 S2 规则): ① E→H: XXTEA 精确匹配, 查不到留空 ② H→D/I/J: 只补空, 不覆盖已有值 ③ Y=1: 仅当 K(下单日) >= C(线索日期) ④ 全额退清: 所有订单都退费 → N/O/P 全部清空 ⑤ N/O/P 0 留空, O 整元 ⑥ G 列不动 用法: python3 scripts/sales_leads_full_refresh.py """ import json, re, time, sys, os, requests, psycopg2 from datetime import datetime from collections import defaultdict from feishu_sheet_utils import FeishuSheetWriter SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE = os.path.dirname(SCRIPTS_DIR) CRED_DIR = "/root/.openclaw/credentials/xiaoxi" sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" SALES_SHEETS = [ ("qJF4I", "小龙", "A3:Z2607"), ("f975f0", "吴迪", "A3:Z8149"), ("qJF4J", "成都", "A3:Z2500"), ] SUMMARY_SHEET_ID = "2smjwA" CS_MAP = {"吴迪": "吴迪", "小龙": "小龙", "Tom": "Tom", "Bob": "Bob"} GOODS_NAMES = { 57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季", 31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包", 61: "瓦拉英语level1+2", } CHANNEL_MAP = { "Apple App Store": "苹果", "科大讯飞学习机": "讯飞", "学而思学习机": "学而思", "华为应用市场": "华为", "小米应用市场": "小米", "应用宝应用市场": "应用宝", "希沃学习机": "希沃", "荣耀应用市场": "荣耀", "小度学习机": "小度", "oppo应用市场": "OPPO", "vivo应用市场": "VIVO", "京东方学习机": "京东方", "步步高学习机": "步步高", "作业帮学习机": "作业帮", "魅族应用市场": "魅族", "官网": "官网", } # Z列渠道归属分类规则 [王虹茗确认 2026-06-15] def classify_channel(key_from): """将 key_from 归类为: 端内 / 销转 / 达人 / 直购""" if not key_from: return "直购" kf = key_from.strip() if kf in ("app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0"): return "端内" if kf.startswith("sales-adp-"): return "销转" if kf.startswith("newmedia-daren-") or kf == "newmedia-dianpu-wwxx-0-0": return "达人" # 其余: dianpu(不含wwxx) + partner/stream/miniprogram/jingxuan/空/shuadan等 → 直购 return "直购" LOG_FILE = "/var/log/xiaoxi_full_refresh.log" def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_secret(key): with open(os.path.join(WORKSPACE, "secrets.env")) as f: for line in f: if line.startswith(f"{key}="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id, range_str=None): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" if range_str: url += f"!{range_str}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取失败 {sheet_id}: {data}") return data["data"]["valueRange"]["values"] def put_values(token, sheet_id, range_str, values): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" ❌ {range_str}: code={r.get('code')} msg={r.get('msg')}") return False return True def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i + chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results def safe_cell(row, idx): """安全获取单元格值,数字转整数字符串""" if len(row) > idx and row[idx] is not None: try: if isinstance(row[idx], (int, float)): if row[idx] == int(row[idx]): return str(int(row[idx])) return str(row[idx]).strip() except (ValueError, TypeError): return str(row[idx]).strip() return "" def parse_date_str(s): """'6月7日'/'6月7日 10:23:48' → '2026-06-07'/'2026-06-07 10:23:48', YYYY-MM-DD 原样返回""" if not s: return "" s = s.strip() if re.match(r'^\d{4}-\d{2}-\d{2}', s): return s # 提取日期+可选时间: '6月7日 10:23:48' 或 '6月7日' m = re.match(r'^(\d{1,2})月(\d{1,2})日(?:\s+(\d{1,2}:\d{2}:\d{2}))?', s) if m: year = datetime.now().year date_part = f"{year}-{int(m.group(1)):02d}-{int(m.group(2)):02d}" if m.group(3): return f"{date_part} {m.group(3)}" return date_part return s # ═══ Step 1: 解析三个销售 sheet ═══ def parse_sales_sheets(token): all_data = {} for sid, sname, rng in SALES_SHEETS: rows = read_sheet(token, sid, rng) entries = [] for idx, row in enumerate(rows): row_num = idx + 3 if not row or all(not cell for cell in row): continue a_val = safe_cell(row, 0) sales = None for k, v in CS_MAP.items(): if k in a_val: sales = v break if not sales: continue phone = "" if len(row) > 4 and row[4]: try: phone = str(int(float(str(row[4])))) except (ValueError, TypeError): phone = str(row[4]).strip() entries.append({ "row": row_num, "sales": sales, "nickname": safe_cell(row, 1), "clue_date": safe_cell(row, 2), "clue_date_parsed": parse_date_str(safe_cell(row, 2)), "phone": phone, "grade": safe_cell(row, 5), "history": safe_cell(row, 6), "existing": { "D": safe_cell(row, 3), "H": safe_cell(row, 7), "I": safe_cell(row, 8), "J": safe_cell(row, 9), "K": safe_cell(row, 10), "L": safe_cell(row, 11), "M": safe_cell(row, 12), "N": safe_cell(row, 13), "O": safe_cell(row, 14), "P": safe_cell(row, 15), "Q": safe_cell(row, 16), "R": safe_cell(row, 17), "S": safe_cell(row, 18), "T": safe_cell(row, 19), "U": safe_cell(row, 20), "X": safe_cell(row, 23), "Y": safe_cell(row, 24), "Z": safe_cell(row, 25), }, }) all_data[sid] = entries phone_cnt = sum(1 for e in entries if re.match(r'^\d{11}$', e["phone"])) uid_cnt = sum(1 for e in entries if e["existing"]["H"] and e["existing"]["H"].isdigit()) log(f" [{sname}] {len(entries)}行, 手机号{phone_cnt}, 已有UID{uid_cnt}") return all_data # ═══ Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配 ═══ def phone_to_uid_xxtea(all_entries): phone_set = set() for entries in all_entries.values(): for e in entries: if re.match(r'^\d{11}$', e["phone"]): phone_set.add(e["phone"]) if not phone_set: log(" 无有效手机号") return {} log(f" XXTEA 加密匹配: {len(phone_set)} 个唯一手机号") phone_enc_map = {} for phone in phone_set: try: phone_enc_map[encrypt_phone(phone)] = phone except Exception as ex: log(f" 加密失败 {phone}: {ex}") log(f" 加密完成, 唯一密文: {len(phone_enc_map)}") conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() enc_list = list(phone_enc_map.keys()) phone_to_uid = {} for i in range(0, len(enc_list), 500): chunk = enc_list[i:i + 500] ph = ",".join(["%s"] * len(chunk)) cur.execute( f"SELECT id, tel_encrypt FROM bi_vala_app_account " f"WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL", chunk ) for uid, tel_enc in cur.fetchall(): plain = phone_enc_map.get(tel_enc) if plain: phone_to_uid[plain] = str(uid) time.sleep(0.05) cur.close() conn.close() log(f" 精确匹配到 {len(phone_to_uid)} 个 UID (via XXTEA)") return phone_to_uid # ═══ Step 3: PostgreSQL 批量查询 ═══ def query_all_pg(all_entries, phone_map): uid_set = set() for entries in all_entries.values(): for e in entries: if re.match(r'^\d{11}$', e["phone"]) and e["phone"] in phone_map: uid_set.add(int(phone_map[e["phone"]])) h_val = e["existing"]["H"] if h_val and h_val.isdigit() and int(h_val) > 0: uid_set.add(int(h_val)) uid_list = list(uid_set) log(f" 有效 user_id: {len(uid_list)}") if not uid_list: return {} conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() info = {uid: { "reg_date": "", "download_channel": "", "trial_count": 0, "has_order": False, "orders": [], "activation": "", "lesson_progress": "", "lesson_time": "", "lesson_minutes": 0, } for uid in uid_set} # 3a. 注册信息 log(" 查询注册信息...") reg_info = batch_in(cur, "SELECT id, created_at, download_channel FROM bi_vala_app_account " "WHERE id IN (%s) AND status=1 AND deleted_at IS NULL", uid_list ) for aid, created_at, dc in reg_info: if aid in info: info[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else "" raw_ch = dc or "" info[aid]["download_channel"] = CHANNEL_MAP.get(raw_ch, raw_ch) # 3b. 体验节数 log(" 查询体验节数...") trial_info = batch_in(cur, "SELECT account_id, COUNT(*) FROM bi_user_course_detail " "WHERE account_id IN (%s) AND expire_time IS NULL AND deleted_at IS NULL " "GROUP BY account_id", uid_list ) for aid, cnt in trial_info: if aid in info: info[aid]["trial_count"] = cnt # 3c. 订单信息 log(" 查询订单信息...") orders = batch_in(cur, "SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status " "FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL " "AND order_status IN (3,4) ORDER BY pay_success_date DESC", uid_list ) user_orders = defaultdict(list) for o in orders: user_orders[o[0]].append(o) trade_nos = [o[1] for o in orders if o[1]] refund_map = {} if trade_nos: refunds = batch_in(cur, "SELECT trade_no, refund_amount_int FROM bi_refund_order " "WHERE trade_no IN (%s) AND status=3", trade_nos ) for tn, amt in refunds: refund_map[tn] = amt for aid, olist in user_orders.items(): if aid not in info: continue info[aid]["has_order"] = True # 存储逐单数据(按 pay_success_date DESC),供后续选取有效单 orders_data = [] for o in olist: trade_no = o[1] or "" pay_dt = o[2] key_from = o[3] or "" goods_id = o[4] pay_amount = o[5] / 100.0 refund_amount = refund_map.get(trade_no, 0) / 100.0 gsv = pay_amount - refund_amount orders_data.append({ "trade_no": trade_no, "pay_dt": pay_dt, "pay_dt_raw": pay_dt.strftime("%Y-%m-%d %H:%M:%S") if pay_dt else "", "key_from": key_from, "goods_id": goods_id, "product": GOODS_NAMES.get(goods_id, f"商品{goods_id}"), "pay_amount": pay_amount, "refund_amount": refund_amount, "gsv": gsv, }) info[aid]["orders"] = orders_data # 3d. 激活课程 log(" 查询激活课程...") try: activations = batch_in(cur, "SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket " "WHERE account_id IN (%s) AND status=1 AND deleted_at IS NULL " "AND season_package_level IN ('A1','A2')", uid_list ) for aid, lvl in activations: if aid in info: info[aid]["activation"] = lvl except Exception as ex: log(f" 激活查询异常: {ex}") # 3e. 角色信息 log(" 查询角色信息...") char_info = batch_in(cur, "SELECT account_id, id FROM bi_vala_app_character " "WHERE account_id IN (%s) AND deleted_at IS NULL", uid_list ) account_chars = defaultdict(list) char_to_account = {} for aid, cid in char_info: account_chars[aid].append(cid) char_to_account[cid] = aid char_ids = list(char_to_account.keys()) log(f" 角色数: {len(char_ids)}") # 3f. 课程映射 cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson") chapter_map = {} for ch_id, cl, cs, cu, cl2 in cur.fetchall(): chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "") # 3g. 课时完成记录 log(" 查询课时完成记录...") char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "total_ms": 0}) for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, chapter_id, created_at FROM {table} " f"WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)", (char_ids,) ) for uid, ch_id, created_at in cur.fetchall(): ch_data = chapter_map.get(ch_id) if not ch_data: continue rec = char_plays[uid] if rec["latest_time"] is None or created_at > rec["latest_time"]: rec["latest_time"] = created_at rec["latest_chapter"] = (ch_id, ch_data) except Exception as ex: log(f" 警告 {table}: {ex}") # 3h. 学习总耗时 log(" 查询学习耗时...") for tbl_idx in range(8): table = f"bi_user_component_play_record_{tbl_idx}" try: cur.execute( f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} " f"WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id", (char_ids,) ) for uid, total_ms in cur.fetchall(): if uid in char_plays: char_plays[uid]["total_ms"] += (total_ms or 0) except Exception as ex: log(f" 警告 {table}: {ex}") cur.close() conn.close() # 汇总到 account 级别 for aid in uid_set: chars = account_chars.get(aid, []) best_time = None best_ch = None total_ms = 0 for cid in chars: play = char_plays.get(cid) if not play: continue if play["latest_chapter"]: if best_time is None or play["latest_time"] > best_time: best_time = play["latest_time"] best_ch = play["latest_chapter"] total_ms += play["total_ms"] info[aid]["lesson_minutes"] = round(total_ms / 60000, 1) if info[aid]["lesson_minutes"] == int(info[aid]["lesson_minutes"]): info[aid]["lesson_minutes"] = int(info[aid]["lesson_minutes"]) if best_ch: ch_id, (cl, cs, cu, cl2) = best_ch info[aid]["lesson_progress"] = f"{cl}-{cs}-{cu}-{cl2}" info[aid]["lesson_time"] = best_time.strftime("%Y-%m-%d") if best_time else "" log(f" 数据库查询完成") return info # ═══ Step 4: 写入销售三表 D/H/I/J/K~U/X/Y/Z 列 ═══ def pick_valid_order(orders, clue_date): """ 从订单列表中选取有效主单。 规则: GSV>0 · 非全额退 · K≥C 返回: (order_dict, is_valid) 或 (None, False) """ if not orders: return None, False valid = [] for o in orders: gsv = o["gsv"] pay_amount = o["pay_amount"] refund_amount = o["refund_amount"] is_full_refund = (pay_amount > 0 and pay_amount == refund_amount) if gsv <= 0 or is_full_refund: continue pay_dt = o["pay_dt"] if pay_dt and clue_date: if pay_dt.strftime("%Y-%m-%d %H:%M:%S") < clue_date: continue valid.append(o) if not valid: return None, False # 取最新一笔有效单 valid.sort(key=lambda o: o["pay_dt"] or datetime.min, reverse=True) return valid[0], True def aggregate_valid_orders(orders, clue_date): """ 聚合所有有效订单:GSV/GMV/退款 累加,订单号取未退款那笔。 规则: GSV>0 · 非全额退 · K≥C 返回: dict 含 aggregated_gsv/gmv/refund, best_trade_no, latest_order 或 (None, False) """ if not orders: return None, False valid = [] for o in orders: gsv = o["gsv"] pay_amount = o["pay_amount"] refund_amount = o["refund_amount"] is_full_refund = (pay_amount > 0 and pay_amount == refund_amount) if gsv <= 0 or is_full_refund: continue pay_dt = o["pay_dt"] if pay_dt and clue_date: if pay_dt.strftime("%Y-%m-%d %H:%M:%S") < clue_date: continue valid.append(o) if not valid: return None, False # 按时间排序 valid.sort(key=lambda o: o["pay_dt"] or datetime.min, reverse=True) # GSV/GMV/退款 累加 total_gsv = sum(o["gsv"] for o in valid) total_gmv = sum(o["pay_amount"] for o in valid) total_refund = sum(o["refund_amount"] for o in valid) # 订单号:优先取未退款的(refund_amount==0),多笔取最新 no_refund = [o for o in valid if o["refund_amount"] == 0] if no_refund: best_trade_no = no_refund[0]["trade_no"] else: # 全部有退款,取退款最少的那笔 valid_by_refund = sorted(valid, key=lambda o: o["refund_amount"]) best_trade_no = valid_by_refund[0]["trade_no"] # 最新订单信息用于 K/L/M 列 latest = valid[0] # 产品:去重拼接 products = [] seen_p = set() for o in valid: p = o.get("product", "") if p and p not in seen_p: products.append(p) seen_p.add(p) return { "aggregated_gsv": total_gsv, "aggregated_gmv": total_gmv, "aggregated_refund": total_refund, "best_trade_no": best_trade_no, "latest_order": latest, "products": "+".join(products) if products else latest.get("product", ""), "valid_count": len(valid), }, True def write_sales_sheets(token, all_entries, phone_map, db_info): now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") for sid, sname, _ in SALES_SHEETS: entries = all_entries[sid] log(f" 写入 {sname} ({sid})...") # 按连续行分组 groups = [] cur_grp = [] for e in entries: if not cur_grp or e["row"] == cur_grp[-1]["row"] + 1: cur_grp.append(e) else: groups.append(cur_grp) cur_grp = [e] if cur_grp: groups.append(cur_grp) for g in groups: sr, er = g[0]["row"], g[-1]["row"] d_vals, h_vals, i_vals, j_vals = [], [], [], [] k_vals, l_vals, m_vals, n_vals = [], [], [], [] o_vals, p_vals, q_vals, r_vals = [], [], [], [] s_vals, t_vals, u_vals = [], [], [] x_vals, y_vals, z_vals = [], [], [] for e in g: phone = e["phone"] existing = e["existing"] clue_date = e["clue_date_parsed"] # 确定 UID aid = 0 uid_str = "" if re.match(r'^\d{11}$', phone) and phone in phone_map: uid_str = phone_map[phone] aid = int(uid_str) elif existing["H"] and existing["H"].isdigit() and int(existing["H"]) > 0: uid_str = existing["H"] aid = int(existing["H"]) # H: UID — XXTEA 匹配到就写,否则留空 if re.match(r'^\d{11}$', phone) and phone in phone_map: h_vals.append([phone_map[phone]]) elif re.match(r'^\d{11}$', phone): h_vals.append([""]) elif existing["H"] and existing["H"].isdigit(): h_vals.append([existing["H"]]) else: h_vals.append([""]) if aid > 0 and aid in db_info: di = db_info[aid] # D: 体验节数 — 只补空 if existing["D"]: d_vals.append([existing["D"]]) else: tc = di["trial_count"] d_vals.append([tc if tc > 0 else ""]) # I: 注册日 — 只补空 if existing["I"]: i_vals.append([existing["I"]]) else: i_vals.append([di["reg_date"]]) # J: 下载渠道 — 只补空 if existing["J"]: j_vals.append([existing["J"]]) else: j_vals.append([di["download_channel"]]) # 选取有效主单 orders = di.get("orders", []) valid_order, is_valid = pick_valid_order(orders, clue_date) if is_valid and valid_order: # Y=1: K/L/X/N/O/P/Z 全写该有效单真实值 pay_dt = valid_order["pay_dt"] order_date = f"{pay_dt.month}月{pay_dt.day}日 {pay_dt.strftime('%H:%M:%S')}" if pay_dt else "" k_vals.append([order_date]) l_vals.append([valid_order["key_from"]]) m_vals.append([valid_order["product"]]) n_vals.append([int(valid_order["pay_amount"]) if valid_order["pay_amount"] > 0 else ""]) o_vals.append([int(valid_order["refund_amount"]) if valid_order["refund_amount"] > 0 else ""]) p_vals.append([int(valid_order["gsv"]) if valid_order["gsv"] > 0 else ""]) x_vals.append([valid_order["trade_no"]]) y_vals.append([1]) z_vals.append([classify_channel(valid_order["key_from"])]) elif di["has_order"]: # 有订单但无有效单 → Y=0, K/L/X 留空 k_vals.append([""]) l_vals.append([""]) m_vals.append([""]) n_vals.append([""]) o_vals.append([""]) p_vals.append([""]) x_vals.append([""]) y_vals.append([""]) z_vals.append([""]) else: k_vals.append([""]) l_vals.append([""]) m_vals.append([""]) n_vals.append([""]) o_vals.append([""]) p_vals.append([""]) x_vals.append([""]) y_vals.append([""]) z_vals.append([""]) # Q: 激活课程 act = di["activation"] if act: q_vals.append([f"{act}体验课" if act in ("A1", "A2") else act]) else: q_vals.append([""]) # R: 行课进度, S: 最近行课时间, T: 学习时长 r_vals.append([di["lesson_progress"] if di["lesson_progress"] else ""]) s_vals.append([di["lesson_time"]]) lm = di["lesson_minutes"] t_vals.append([lm if lm > 0 else ""]) else: for arr in [d_vals, i_vals, j_vals, k_vals, l_vals, m_vals, n_vals, o_vals, p_vals, q_vals, r_vals, s_vals, t_vals, x_vals, y_vals, z_vals]: arr.append([""]) # U: 更新时间 u_vals.append([now_str]) cols = [ ("D", d_vals), ("H", h_vals), ("I", i_vals), ("J", j_vals), ("K", k_vals), ("L", l_vals), ("M", m_vals), ("N", n_vals), ("O", o_vals), ("P", p_vals), ("Q", q_vals), ("R", r_vals), ("S", s_vals), ("T", t_vals), ("U", u_vals), ("X", x_vals), ("Y", y_vals), ("Z", z_vals), ] for col_letter, vals in cols: put_values(token, sid, f"{col_letter}{sr}:{col_letter}{er}", vals) time.sleep(0.1) log(f" {sname}: {len(entries)} 行写入完成") # ═══ Step 5: 汇总到「订单汇总」sheet ═══ def clear_summary_sheet(token): """先清空订单汇总 sheet 的旧数据(A~W列,从第3行开始),再写入新数据。""" log(" 检查订单汇总 sheet 现有数据...") try: rows = read_sheet(token, SUMMARY_SHEET_ID, "A3:A5000") last_data_row = 2 for i, row in enumerate(rows): if row and any(cell for cell in row if cell): last_data_row = 3 + i if last_data_row < 3: log(" 订单汇总 sheet 无旧数据,跳过清空") return log(f" 清空 A3:W{last_data_row}({last_data_row - 2} 行旧数据)...") writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token) writer.clear(SUMMARY_SHEET_ID, start_row=3, end_row=last_data_row, cols=23) log(" 清空完成") except Exception as e: log(f" 清空异常: {e}") def write_summary_sheet(token, all_entries, phone_map, db_info): """ 订单汇总: 唯一真源 = 三表 Y=1 gate 的 unique X。 从 gate 行全量重建,A-U 镜像 gate 行,V=Z(或classify L),W=X。 同 X 多进线 → 只保留 1 行(行号最小)。 """ clear_summary_sheet(token) log(" 汇总订单数据(gate 全量重建)...") # 从三表收集 Y=1 行 gate_rows = [] for sid, sname, _ in SALES_SHEETS: entries = all_entries[sid] for e in entries: # 读取当前行的 Y 和 X(写入后的值) existing = e["existing"] # Y 值已在上一步写入,这里用 phone_map + db_info 重新判断 phone = e["phone"] clue_date = e["clue_date_parsed"] aid = 0 if re.match(r'^\d{11}$', phone) and phone in phone_map: aid = int(phone_map[phone]) elif existing["H"] and existing["H"].isdigit() and int(existing["H"]) > 0: aid = int(existing["H"]) di = db_info.get(aid, {}) if aid > 0 else {} orders = di.get("orders", []) if not orders: continue # 用 aggregate_valid_orders 聚合所有有效订单 agg_result, is_valid = aggregate_valid_orders(orders, clue_date) if not is_valid or not agg_result: continue trade_no = agg_result["best_trade_no"] latest = agg_result["latest_order"] pay_dt = latest["pay_dt"] order_date = f"{pay_dt.month}月{pay_dt.day}日 {pay_dt.strftime('%H:%M:%S')}" if pay_dt else "" row_data = [ e["sales"], # A: 销售归属 e["nickname"], # B: 微信昵称 e["clue_date"], # C: 进线日期 di.get("trial_count", 0) or "", # D: 体验节数 phone, # E: 手机号 e["grade"], # F: 用户年级 e["history"], # G: 课史/跟进 str(aid) if aid > 0 else "", # H: 用户ID di.get("reg_date", ""), # I: 注册日期 di.get("download_channel", ""), # J: 下载渠道 order_date, # K: 下单日期 latest["key_from"], # L: 成交渠道 agg_result["products"], # M: 产品(多单用+拼接) int(agg_result["aggregated_gmv"]) if agg_result["aggregated_gmv"] > 0 else "", # N: GMV(聚合) int(agg_result["aggregated_refund"]) if agg_result["aggregated_refund"] > 0 else "", # O: 退款(聚合) int(agg_result["aggregated_gsv"]) if agg_result["aggregated_gsv"] > 0 else "", # P: GSV(聚合) (f"{di['activation']}体验课" if di.get("activation") in ("A1", "A2") else di.get("activation", "")), # Q: 激活课程 di.get("lesson_progress", ""), # R: 行课进度 di.get("lesson_time", ""), # S: 最近行课时间 di.get("lesson_minutes", 0) or "", # T: 学习时长 datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # U: 更新时间 classify_channel(latest["key_from"]), # V: 渠道归属 trade_no, # W: 订单号(取未退款那笔) ] gate_rows.append((trade_no, e["row"], row_data)) # 同 X 多进线 → 只保留行号最小的 1 行 seen_trade = {} deduped = [] for trade_no, row_num, row_data in sorted(gate_rows, key=lambda x: x[1]): if trade_no and trade_no in seen_trade: continue if trade_no: seen_trade[trade_no] = True deduped.append(row_data) log(f" 共 {len(gate_rows)} 条 gate 行, 去重后 {len(deduped)} 条, 唯一订单号 {len(seen_trade)}") if not deduped: log(" 无有效订单,跳过汇总") return # 写入订单汇总 sheet(从第3行开始,A~W 共23列) writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token) # 构建 A~W 的值数组(23列),确保每行长度一致 values = [] for row_data in deduped: padded = row_data[:23] while len(padded) < 23: padded.append("") values.append(padded) writer.write(SUMMARY_SHEET_ID, start_row=3, rows=values, cols=23) log(f" 订单汇总写入完成, 共 {len(deduped)} 行") # ═══ Main ═══ def main(): log("=" * 60) log("销售线索全量刷新 (XXTEA精确匹配版) 启动") try: token = get_fs_token() log("Step 1: 解析销售三表") all_entries = parse_sales_sheets(token) log("Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配") phone_map = phone_to_uid_xxtea(all_entries) log("Step 3: PostgreSQL 批量查询") db_info = query_all_pg(all_entries, phone_map) log("Step 4: 写入销售三表 H~V 列") write_sales_sheets(token, all_entries, phone_map, db_info) log("Step 5: 汇总到「订单汇总」sheet") write_summary_sheet(token, all_entries, phone_map, db_info) log("✅ 全量刷新完成") return 0 except Exception as e: log(f"❌ ERROR: {e}") import traceback traceback.print_exc() return 1 if __name__ == "__main__": sys.exit(main())