#!/usr/bin/env python3 """ 大麦 v2_fill — 细水新架构版 线索UID + 订单全量回填 表: CP7BsOjYdhtcmft5iz2csIaHnKe ① 线索明细 (7fdb4b): F列手机号 → XXTEA加密 → PG tel_encrypt精确匹配 → K列UID ② 订单明细 (vrYbiX): clear后全量写 A-P (勿填 Q/R) · G 挂主进线 · B=线索C主归属 · N≠重复* 用法: python3 scripts/damai_v2_fill.py 契约: docs/damai-v2-fill-skill.md (与 xiaoxi 版同契约) """ import json, re, time, sys, os, requests, psycopg2 from datetime import datetime from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE = os.path.dirname(SCRIPTS_DIR) sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone # ── 配置 ── SPREADSHEET_TOKEN = "CP7BsOjYdhtcmft5iz2csIaHnKe" LEADS_SHEET_ID = "7fdb4b" # 线索明细 ORDERS_SHEET_ID = "vrYbiX" # 订单明细 # 实习虾 app 凭证 (有 sheets 读写权限) FS_APP_ID = "cli_aa898f32d4799bea" FS_APP_SECRET = "wGBjexDxHsHBsx9lk8O2gdbkMFzaJ3kd" # 渠道映射 CHANNEL_MAP = { "Apple App Store": "苹果", "科大讯飞学习机": "讯飞", "学而思学习机": "学而思", "华为应用市场": "华为", "小米应用市场": "小米", "应用宝应用市场": "应用宝", "希沃学习机": "希沃", "荣耀应用市场": "荣耀", "小度学习机": "小度", "oppo应用市场": "OPPO", "vivo应用市场": "VIVO", "京东方学习机": "京东方", "步步高学习机": "步步高", "作业帮学习机": "作业帮", "魅族应用市场": "魅族", "官网": "官网", } LOG_FILE = "/var/log/damai_v2_fill.log" def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_secret(key): with open(os.path.join(WORKSPACE, "secrets.env")) as f: for line in f: if line.startswith(f"{key}="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": FS_APP_ID, "app_secret": FS_APP_SECRET}, timeout=15 ) r = resp.json() if r.get("code") != 0: raise RuntimeError(f"获取飞书token失败: {r}") return r["tenant_access_token"] def read_sheet(token, sheet_id, range_str=None): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" if range_str: url += f"!{range_str}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取失败 {sheet_id}: {data}") return data["data"]["valueRange"]["values"] def put_values(token, sheet_id, range_str, values): """单次写入,自动分批""" url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" ❌ {range_str}: {r.get('code')} {r.get('msg')}") return False return True def clear_range(token, sheet_id, range_str): """清空指定范围 — 写入空字符串覆盖""" m = re.match(r'([A-Z]+)(\d+):([A-Z]+)(\d+)', range_str) if not m: log(f" ❌ 无法解析范围: {range_str}") return False sc, sr, ec, er = m.group(1), int(m.group(2)), m.group(3), int(m.group(4)) ncols = ord(ec) - ord(sc) + 1 BATCH = 500 for start in range(sr, er + 1, BATCH): end = min(start + BATCH - 1, er) batch_rows = end - start + 1 empty_row = [""] * ncols values = [empty_row[:] for _ in range(batch_rows)] rng = f"{sc}{start}:{ec}{end}" url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{sheet_id}!{rng}", "values": values}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=60) r = resp.json() if r.get("code") != 0: log(f" ❌ clear {rng}: {r.get('code')} {r.get('msg')}") return False time.sleep(0.2) return True def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i+chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results def classify_channel(key_from): """渠道归属分类: 端内/销转/达人/直购""" if not key_from: return "直购" kf = key_from.strip() if kf in ("app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0"): return "端内" if kf.startswith("sales-adp-"): return "销转" if kf.startswith("newmedia-daren-") or kf == "newmedia-dianpu-wwxx-0-0": return "达人" return "直购" def parse_date(date_str): """解析 '6月14日 19:09:12' 或 '6月14日' → '2026-06-14'""" if not date_str: return "" date_str = str(date_str).strip() m = re.match(r'(\d+)月(\d+)日', date_str) if not m: return "" month, day = int(m.group(1)), int(m.group(2)) return f"2026-{month:02d}-{day:02d}" def check_timing(entry_time_str, order_date_str): """时序判断: 下单日期 ≥ 进线日期 → '是', 否则 '' """ entry_date = parse_date(entry_time_str) if not entry_date or not order_date_str: return "" return "是" if order_date_str >= entry_date else "" # ═══════════════════════════════════════════════════════════════ # ① 线索明细: F→K UID # ═══════════════════════════════════════════════════════════════ def fill_lead_uids(token): """读取线索明细F列手机号 → XXTEA加密 → PG匹配 → 写K列UID""" log("=" * 60) log("① 线索明细 F→K UID") log("=" * 60) # 读取线索明细 (A3:K200, 跳过表头2行) rows = read_sheet(token, LEADS_SHEET_ID, "A3:K200") log(f" 读取 {len(rows)} 行线索数据") # 解析: (row_idx, phone, existing_uid, 主归属, 昵称, 进线时间) leads = [] for i, row in enumerate(rows): row_idx = i + 3 # 飞书行号 phone = "" if len(row) > 5 and row[5]: try: phone = str(int(float(row[5]))) except: phone = str(row[5]).strip() existing_uid = "" if len(row) > 10 and row[10]: try: existing_uid = str(int(float(row[10]))) except: existing_uid = str(row[10]).strip() master_cs = str(row[2]).strip() if len(row) > 2 and row[2] else "" # C=主归属 nickname = str(row[4]).strip() if len(row) > 4 and row[4] else "" # E=昵称 entry_time = str(row[1]).strip() if len(row) > 1 and row[1] else "" # B=进线时间 leads.append((row_idx, phone, existing_uid, master_cs, nickname, entry_time)) # 找出需要查UID的行 (有手机号但无UID) need_uid = [(idx, phone) for idx, phone, uid, *_ in leads if re.match(r'^\d{11}$', phone) and (not uid or not uid.isdigit() or int(uid) <= 0)] log(f" 需要查UID: {len(need_uid)} 个手机号") if not need_uid: log(" 无需查询,跳过") return leads # XXTEA 加密 phone_enc_map = {} for _, phone in need_uid: try: enc = encrypt_phone(phone) phone_enc_map[enc] = phone except Exception as e: log(f" 加密失败 {phone}: {e}") log(f" 加密完成, 唯一密文: {len(phone_enc_map)}") # PG 精确查询 conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() enc_list = list(phone_enc_map.keys()) phone_to_uid = {} for i in range(0, len(enc_list), 500): chunk = enc_list[i:i+500] ph = ",".join(["%s"] * len(chunk)) cur.execute( f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL", chunk ) for uid, tel_enc in cur.fetchall(): plain = phone_enc_map.get(tel_enc) if plain: phone_to_uid[plain] = str(uid) time.sleep(0.05) cur.close() conn.close() log(f" 精确匹配到 {len(phone_to_uid)} 个 UID") # 构建 K 列写入数据 (只写需要更新的行) updates = [] for idx, phone in need_uid: uid = phone_to_uid.get(phone, "") updates.append((idx, uid)) # 写入 (每行单独写K列) write_count = 0 for idx, uid in updates: if uid: cell_range = f"K{idx}:K{idx}" if put_values(token, LEADS_SHEET_ID, cell_range, [[uid]]): write_count += 1 time.sleep(0.1) log(f" 写入 {write_count} 个 UID 到 K 列") # 更新 leads 列表中的 UID uid_map = {idx: uid for idx, uid in updates} updated_leads = [] for idx, phone, uid, cs, nick, et in leads: if idx in uid_map and uid_map[idx]: uid = uid_map[idx] updated_leads.append((idx, phone, uid, cs, nick, et)) return updated_leads # ═══════════════════════════════════════════════════════════════ # ② 订单明细: clear → 全量回填 A-P # ═══════════════════════════════════════════════════════════════ def fill_order_details(token, leads): """查询所有线索UID的订单 → clear 订单明细 → 全量写入 A-P""" log("=" * 60) log("② 订单明细 DB 全量回填") log("=" * 60) # 收集所有 UID uid_set = set() lead_by_uid = {} # uid → lead info for idx, phone, uid, cs, nick, et in leads: if uid and uid.isdigit() and int(uid) > 0: uid_int = int(uid) uid_set.add(uid_int) if uid_int not in lead_by_uid: lead_by_uid[uid_int] = (idx, phone, cs, nick, et) uid_list = list(uid_set) log(f" 有效 UID: {len(uid_list)}") if not uid_list: log(" 无 UID,跳过订单查询") return # PG 查询订单 conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() # 查询订单 log(" 查询订单...") orders = batch_in(cur, "SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status " "FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) " "ORDER BY account_id, pay_success_date", uid_list ) log(f" 订单数: {len(orders)}") # 查询退款 trade_nos = [o[1] for o in orders if o[1]] refund_map = defaultdict(int) if trade_nos: refunds = batch_in(cur, "SELECT trade_no, refund_amount_int FROM bi_refund_order WHERE trade_no IN (%s) AND status=3", trade_nos ) for tno, amt in refunds: refund_map[tno] += amt cur.close() conn.close() # 构建订单明细行 (A-P, 共16列) order_rows = [] for o in orders: account_id, trade_no, pay_date, key_from, goods_id, pay_amount, order_status = o lead = lead_by_uid.get(account_id) if not lead: continue lead_idx, phone, lead_cs, nickname, entry_time = lead # A=下单月 order_month = "" if pay_date: order_month = f"{pay_date.month}月" # B=主归属 (来自线索C列) master_cs = lead_cs # C=订单号 order_no = trade_no or "" # D=用户ID user_id = str(account_id) # E=手机号 phone_str = phone # F=昵称 nick = nickname # G=线索ID (挂主进线, 用线索行号) lead_ref = str(lead_idx) # H=进线时间 entry = entry_time # I=下单时间 order_time = pay_date.strftime("%Y-%m-%d") if pay_date else "" # J=GMV (分→元) gmv = round(pay_amount / 100, 2) if pay_amount else 0 # K=退款 refund = round(refund_map.get(trade_no, 0) / 100, 2) # L=GSV gsv = round(gmv - refund, 2) # M=keyfrom kf = key_from or "" # N=渠道归属 channel = classify_channel(key_from) # O=时序有效 (下单≥进线→是) timing_valid = check_timing(entry_time, order_time) # P=备注 remark = "" order_rows.append([ order_month, master_cs, order_no, user_id, phone_str, nick, lead_ref, entry, order_time, gmv, refund, gsv, kf, channel, timing_valid, remark ]) log(f" 生成 {len(order_rows)} 行订单数据") # Clear 订单明细 (A3:R5000) log(" 清空订单明细...") # 先清空大范围 clear_range(token, ORDERS_SHEET_ID, "A3:R5000") time.sleep(0.5) # 写入订单数据 (分批, 每批最多 400 行 × 16 列 = 6400 格, 留余量用 250 行/批) BATCH_ROWS = 250 total_written = 0 for i in range(0, len(order_rows), BATCH_ROWS): batch = order_rows[i:i+BATCH_ROWS] start_row = i + 3 # 从第3行开始 end_row = start_row + len(batch) - 1 range_str = f"A{start_row}:P{end_row}" if put_values(token, ORDERS_SHEET_ID, range_str, batch): total_written += len(batch) time.sleep(0.3) log(f" 写入 {total_written} 行订单数据") # db_info 时间戳 db_info = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log(f" db_info: {db_info}") return db_info # ═══════════════════════════════════════════════════════════════ # Main # ═══════════════════════════════════════════════════════════════ def main(): log("大麦 v2_fill 开始") t0 = time.time() token = get_fs_token() log(f"飞书 token 获取成功") # ① 线索明细 F→K UID leads = fill_lead_uids(token) # ② 订单明细 全量回填 db_info = fill_order_details(token, leads) elapsed = time.time() - t0 log(f"v2_fill 完成 · db_info: {db_info} · 耗时 {elapsed:.1f}s") # 输出摘要 uid_count = sum(1 for _, _, uid, *_ in leads if uid and uid.isdigit() and int(uid) > 0) phone_count = sum(1 for _, phone, *_ in leads if phone and re.match(r'^\d{11}$', phone)) print(f"\n{'='*60}") print(f"v2_fill 完成") print(f" 线索明细: {len(leads)} 行, {phone_count} 手机号, {uid_count} UID") print(f" db_info: {db_info}") print(f"{'='*60}") if __name__ == "__main__": main()