#!/usr/bin/env python3 """ 小红书一方回传 · A档三包导出 数据源: Bot销转表三页(小龙qJF4I + 吴迪f975f0 + 成都qJF4J) 输出: plaintext CSV → phone_encrypt.py → 上传CSV """ import json, requests, os, sys, csv from datetime import datetime, date, timedelta from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import phone_md5 # ── 配置 ── SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug" SHEET_IDS = {"小龙": "qJF4I", "吴迪": "f975f0", "成都": "qJF4J"} CRED_DIR = "/root/.openclaw/credentials/xiaoxi" OUTPUT_DIR = os.path.join(os.path.dirname(SCRIPTS_DIR), "output", "yifang_export") os.makedirs(OUTPUT_DIR, exist_ok=True) # 列索引 (0-based, A=0) COL_C = 2 # 进线日期 COL_E = 4 # 手机号 COL_H = 7 # 用户ID (account_id) COL_K = 10 # 是否下单 COL_L = 11 # 下单日期 COL_O = 14 # 下单金额/GMV def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_sheet(token, sheet_id): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取Sheet {sheet_id} 失败: {data}") return data["data"]["valueRange"]["values"] def parse_date(val): """解析日期字符串,支持多种格式,包括中文 'M月D日'""" if not val: return None val = str(val).strip() if not val: return None # 标准格式 for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d"]: try: return datetime.strptime(val, fmt).date() except ValueError: continue # 只取前10位 try: return datetime.strptime(val[:10], "%Y-%m-%d").date() except ValueError: pass # 中文格式 "M月D日" — 根据月份推断年份 import re m = re.match(r"(\d{1,2})月(\d{1,2})日", val) if m: month, day = int(m.group(1)), int(m.group(2)) now = datetime.now() year = now.year if month <= now.month else now.year - 1 try: return date(year, month, day) except ValueError: return None return None def parse_amount(val): """解析金额,返回float""" if not val: return None val = str(val).strip().replace("¥", "").replace(",", "").replace(" ", "") if not val: return None try: return float(val) except ValueError: return None def clean_phone(val): """清洗手机号,返回11位数字字符串""" if not val: return None val = str(val).strip().replace(" ", "").replace("-", "").replace("'", "") # 处理科学计数法 if "e" in val.lower() or "E" in val: try: val = str(int(float(val))) except: return None # 只保留数字 digits = "".join(c for c in val if c.isdigit()) if len(digits) == 11 and digits.startswith("1"): return digits return None def main(): token = get_fs_token() print(f"飞书Token获取成功") # ── 第一步:读取三页数据 ── all_rows = [] # [(sales_name, row_data)] for name, sheet_id in SHEET_IDS.items(): print(f"读取 {name}({sheet_id})...") rows = read_sheet(token, sheet_id) # 跳过第1行(表头)和第2行(标注行),从第3行开始 data_rows = rows[2:] if len(rows) > 2 else [] for row in data_rows: all_rows.append((name, row)) print(f" {name}: {len(data_rows)} 行数据") print(f"\n三页合计: {len(all_rows)} 行") # ── 第二步:解析并去重 ── # A1: 按 account_id 去重,取最早进线日期 # 同时记录手机号(优先E列,E空用H列后续反查) lead_map = {} # account_id -> {phone, lead_date, has_order, order_date, amount, sales_name} for sales_name, row in all_rows: # 读取各列 lead_date_str = row[COL_C] if len(row) > COL_C else None phone_raw = row[COL_E] if len(row) > COL_E else None uid_raw = row[COL_H] if len(row) > COL_H else None is_order = row[COL_K] if len(row) > COL_K else None order_date_str = row[COL_L] if len(row) > COL_L else None amount_raw = row[COL_O] if len(row) > COL_O else None # 解析 lead_date = parse_date(lead_date_str) phone = clean_phone(phone_raw) uid = str(uid_raw).strip() if uid_raw else None # 跳过无效行:没有进线日期且没有UID if not lead_date and not uid: continue # 过滤:进线日期 >= 2025-09-01 if lead_date and lead_date < date(2025, 9, 1): continue # 确定 account_id account_id = uid if uid and uid != "None" and uid != "" else None if not account_id: continue # 没有UID无法关联 # 判断是否下单 has_order = False if is_order and str(is_order).strip() in ("是", "1", "yes", "Yes", "YES", "TRUE", "true"): has_order = True order_date = parse_date(order_date_str) amount = parse_amount(amount_raw) # 去重:同一 account_id 保留最早进线日期 if account_id not in lead_map: lead_map[account_id] = { "phone": phone, "lead_date": lead_date, "has_order": has_order, "order_date": order_date, "amount": amount, "sales_name": sales_name, } else: existing = lead_map[account_id] # 取最早进线日期 if lead_date and (not existing["lead_date"] or lead_date < existing["lead_date"]): existing["lead_date"] = lead_date # 如果之前没手机号,用新的 if phone and not existing["phone"]: existing["phone"] = phone # 如果之前没标记下单,用新的 if has_order and not existing["has_order"]: existing["has_order"] = True existing["order_date"] = order_date existing["amount"] = amount # 如果有多个订单,保留最近的 if has_order and order_date and existing["order_date"]: if order_date > existing["order_date"]: existing["order_date"] = order_date existing["amount"] = amount print(f"\n去重后留资用户: {len(lead_map)} 人") # ── 第三步:E列为空的,用H列反查数据库手机号 ── no_phone_uids = [aid for aid, info in lead_map.items() if not info["phone"]] print(f"E列无手机号需反查: {len(no_phone_uids)} 人") if no_phone_uids: import psycopg2 secrets_path = os.path.join(SCRIPTS_DIR, "..", "secrets.env") with open(secrets_path) as f: pg_pass = None for line in f: if line.startswith("PG_ONLINE_PASSWORD="): pg_pass = line.strip().split("=", 1)[1].strip("'\"") conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=pg_pass, dbname="vala_bi" ) cur = conn.cursor() # 分批查询 batch_size = 500 for i in range(0, len(no_phone_uids), batch_size): batch = no_phone_uids[i:i+batch_size] placeholders = ",".join(["%s"] * len(batch)) cur.execute( f"SELECT id::text, tel FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status=1 AND deleted_at IS NULL", batch ) for row in cur.fetchall(): aid, tel = row phone = clean_phone(tel) if phone and aid in lead_map: lead_map[aid]["phone"] = phone cur.close() conn.close() still_no_phone = sum(1 for info in lead_map.values() if not info["phone"]) print(f"反查后仍无手机号: {still_no_phone} 人") # ── 第四步:生成 A1/A2/A3 ── today = date.today() cutoff_90d = today - timedelta(days=90) # A1: 小红书留资(全部) a1_rows = [] a1_no_phone = 0 for aid, info in lead_map.items(): if not info["lead_date"]: continue phone = info["phone"] if not phone: a1_no_phone += 1 continue a1_rows.append({ "phone": phone, "event_time": info["lead_date"].strftime("%Y-%m-%d"), "behavior": "留资", "channel": "小红书", "amount": "", "extra": f"uid={aid}", "account_id": aid, "lead_date": info["lead_date"], "has_order": info["has_order"], "order_date": info["order_date"], "amount_val": info["amount"], }) # A2: 小红书留资未成交90d(A1子集,K≠是 且 进线≥90天) a2_rows = [] a2_no_phone = 0 for row in a1_rows: if row["has_order"]: continue if row["lead_date"] > cutoff_90d: continue a2_rows.append(row) # A3: 小红书成单(K=是 或 O=1,取最近一单) # 先收集所有成单用户 paid_map = {} # account_id -> info for aid, info in lead_map.items(): if info["has_order"] and info["phone"] and info["order_date"]: if aid not in paid_map or info["order_date"] > paid_map[aid]["order_date"]: paid_map[aid] = { "phone": info["phone"], "event_time": info["order_date"].strftime("%Y-%m-%d"), "behavior": "购买", "channel": "小红书", "amount": str(info["amount"]) if info["amount"] else "", "extra": f"uid={aid}", "account_id": aid, "order_date": info["order_date"], "amount_val": info["amount"], } a3_rows = list(paid_map.values()) print(f"\n=== 各包统计 ===") print(f"A1 留资: {len(a1_rows)} 行 (无手机号跳过: {a1_no_phone})") print(f"A2 未成交90d: {len(a2_rows)} 行") print(f"A3 成单: {len(a3_rows)} 行") # ── 第五步:写明文CSV ── plaintext_header = ["手机号", "行为时间", "行为类型", "样本渠道", "实付金额", "额外信息"] for name, rows in [("A1_wala_lead_xhs_202509-20260603", a1_rows), ("A2_wala_lead_xhs_no_order_90d", a2_rows), ("A3_wala_paid_xhs_202509-20260603", a3_rows)]: plaintext_path = os.path.join(OUTPUT_DIR, f"plaintext_{name}.csv") with open(plaintext_path, "w", newline="", encoding="utf-8-sig") as f: writer = csv.writer(f) writer.writerow(plaintext_header) for row in rows: writer.writerow([ row["phone"], row["event_time"], row["behavior"], row["channel"], row["amount"], row["extra"], ]) print(f"明文CSV: {plaintext_path} ({len(rows)} 行)") # ── 第六步:跑 phone_encrypt.py 生成上传CSV ── upload_header = ["用户ID(必填)", "行为时间(选填)", "行为类型(选填)", "样本渠道(选填)", "实付金额(选填)", "额外信息(选填)"] for name, rows in [("A1_wala_lead_xhs_202509-20260603", a1_rows), ("A2_wala_lead_xhs_no_order_90d", a2_rows), ("A3_wala_paid_xhs_202509-20260603", a3_rows)]: upload_path = os.path.join(OUTPUT_DIR, f"{name}.csv") with open(upload_path, "w", newline="", encoding="utf-8-sig") as f: writer = csv.writer(f) writer.writerow(upload_header) for row in rows: user_id_md5 = phone_md5(row["phone"]) writer.writerow([ user_id_md5, row["event_time"], row["behavior"], row["channel"], row["amount"], row["extra"], ]) print(f"上传CSV: {upload_path} ({len(rows)} 行)") # ── 汇总 ── print(f"\n=== 导出完成 ===") print(f"A1_wala_lead_xhs_202509-20260603.csv: {len(a1_rows)} 行") print(f"A2_wala_lead_xhs_no_order_90d.csv: {len(a2_rows)} 行") print(f"A3_wala_paid_xhs_202509-20260603.csv: {len(a3_rows)} 行") print(f"输出目录: {OUTPUT_DIR}") if __name__ == "__main__": main()