ai_member_xiaoxi/scripts/export_xhs_yifang.py
2026-06-04 08:00:01 +08:00

359 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
小红书一方回传 · A档三包导出
数据源: Bot销转表三页小龙qJF4I + 吴迪f975f0 + 成都qJF4J
输出: plaintext CSV → phone_encrypt.py → 上传CSV
"""
import json, requests, os, sys, csv
from datetime import datetime, date, timedelta
from collections import defaultdict
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import phone_md5
# ── 配置 ──
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SHEET_IDS = {"小龙": "qJF4I", "吴迪": "f975f0", "成都": "qJF4J"}
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
OUTPUT_DIR = os.path.join(os.path.dirname(SCRIPTS_DIR), "output", "yifang_export")
os.makedirs(OUTPUT_DIR, exist_ok=True)
# 列索引 (0-based, A=0)
COL_C = 2 # 进线日期
COL_E = 4 # 手机号
COL_H = 7 # 用户ID (account_id)
COL_K = 10 # 是否下单
COL_L = 11 # 下单日期
COL_O = 14 # 下单金额/GMV
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_sheet(token, sheet_id):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"读取Sheet {sheet_id} 失败: {data}")
return data["data"]["valueRange"]["values"]
def parse_date(val):
"""解析日期字符串,支持多种格式,包括中文 'M月D日'"""
if not val:
return None
val = str(val).strip()
if not val:
return None
# 标准格式
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d"]:
try:
return datetime.strptime(val, fmt).date()
except ValueError:
continue
# 只取前10位
try:
return datetime.strptime(val[:10], "%Y-%m-%d").date()
except ValueError:
pass
# 中文格式 "M月D日" — 根据月份推断年份
import re
m = re.match(r"(\d{1,2})月(\d{1,2})日", val)
if m:
month, day = int(m.group(1)), int(m.group(2))
now = datetime.now()
year = now.year if month <= now.month else now.year - 1
try:
return date(year, month, day)
except ValueError:
return None
return None
def parse_amount(val):
"""解析金额返回float"""
if not val:
return None
val = str(val).strip().replace("¥", "").replace(",", "").replace(" ", "")
if not val:
return None
try:
return float(val)
except ValueError:
return None
def clean_phone(val):
"""清洗手机号返回11位数字字符串"""
if not val:
return None
val = str(val).strip().replace(" ", "").replace("-", "").replace("'", "")
# 处理科学计数法
if "e" in val.lower() or "E" in val:
try:
val = str(int(float(val)))
except:
return None
# 只保留数字
digits = "".join(c for c in val if c.isdigit())
if len(digits) == 11 and digits.startswith("1"):
return digits
return None
def main():
token = get_fs_token()
print(f"飞书Token获取成功")
# ── 第一步:读取三页数据 ──
all_rows = [] # [(sales_name, row_data)]
for name, sheet_id in SHEET_IDS.items():
print(f"读取 {name}({sheet_id})...")
rows = read_sheet(token, sheet_id)
# 跳过第1行(表头)和第2行(标注行)从第3行开始
data_rows = rows[2:] if len(rows) > 2 else []
for row in data_rows:
all_rows.append((name, row))
print(f" {name}: {len(data_rows)} 行数据")
print(f"\n三页合计: {len(all_rows)}")
# ── 第二步:解析并去重 ──
# A1: 按 account_id 去重,取最早进线日期
# 同时记录手机号优先E列E空用H列后续反查
lead_map = {} # account_id -> {phone, lead_date, has_order, order_date, amount, sales_name}
for sales_name, row in all_rows:
# 读取各列
lead_date_str = row[COL_C] if len(row) > COL_C else None
phone_raw = row[COL_E] if len(row) > COL_E else None
uid_raw = row[COL_H] if len(row) > COL_H else None
is_order = row[COL_K] if len(row) > COL_K else None
order_date_str = row[COL_L] if len(row) > COL_L else None
amount_raw = row[COL_O] if len(row) > COL_O else None
# 解析
lead_date = parse_date(lead_date_str)
phone = clean_phone(phone_raw)
uid = str(uid_raw).strip() if uid_raw else None
# 跳过无效行没有进线日期且没有UID
if not lead_date and not uid:
continue
# 过滤:进线日期 >= 2025-09-01
if lead_date and lead_date < date(2025, 9, 1):
continue
# 确定 account_id
account_id = uid if uid and uid != "None" and uid != "" else None
if not account_id:
continue # 没有UID无法关联
# 判断是否下单
has_order = False
if is_order and str(is_order).strip() in ("", "1", "yes", "Yes", "YES", "TRUE", "true"):
has_order = True
order_date = parse_date(order_date_str)
amount = parse_amount(amount_raw)
# 去重:同一 account_id 保留最早进线日期
if account_id not in lead_map:
lead_map[account_id] = {
"phone": phone,
"lead_date": lead_date,
"has_order": has_order,
"order_date": order_date,
"amount": amount,
"sales_name": sales_name,
}
else:
existing = lead_map[account_id]
# 取最早进线日期
if lead_date and (not existing["lead_date"] or lead_date < existing["lead_date"]):
existing["lead_date"] = lead_date
# 如果之前没手机号,用新的
if phone and not existing["phone"]:
existing["phone"] = phone
# 如果之前没标记下单,用新的
if has_order and not existing["has_order"]:
existing["has_order"] = True
existing["order_date"] = order_date
existing["amount"] = amount
# 如果有多个订单,保留最近的
if has_order and order_date and existing["order_date"]:
if order_date > existing["order_date"]:
existing["order_date"] = order_date
existing["amount"] = amount
print(f"\n去重后留资用户: {len(lead_map)}")
# ── 第三步E列为空的用H列反查数据库手机号 ──
no_phone_uids = [aid for aid, info in lead_map.items() if not info["phone"]]
print(f"E列无手机号需反查: {len(no_phone_uids)}")
if no_phone_uids:
import psycopg2
secrets_path = os.path.join(SCRIPTS_DIR, "..", "secrets.env")
with open(secrets_path) as f:
pg_pass = None
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
pg_pass = line.strip().split("=", 1)[1].strip("'\"")
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
port=28591,
user="ai_member",
password=pg_pass,
dbname="vala_bi"
)
cur = conn.cursor()
# 分批查询
batch_size = 500
for i in range(0, len(no_phone_uids), batch_size):
batch = no_phone_uids[i:i+batch_size]
placeholders = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT id::text, tel FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status=1 AND deleted_at IS NULL",
batch
)
for row in cur.fetchall():
aid, tel = row
phone = clean_phone(tel)
if phone and aid in lead_map:
lead_map[aid]["phone"] = phone
cur.close()
conn.close()
still_no_phone = sum(1 for info in lead_map.values() if not info["phone"])
print(f"反查后仍无手机号: {still_no_phone}")
# ── 第四步:生成 A1/A2/A3 ──
today = date.today()
cutoff_90d = today - timedelta(days=90)
# A1: 小红书留资(全部)
a1_rows = []
a1_no_phone = 0
for aid, info in lead_map.items():
if not info["lead_date"]:
continue
phone = info["phone"]
if not phone:
a1_no_phone += 1
continue
a1_rows.append({
"phone": phone,
"event_time": info["lead_date"].strftime("%Y-%m-%d"),
"behavior": "留资",
"channel": "小红书",
"amount": "",
"extra": f"uid={aid}",
"account_id": aid,
"lead_date": info["lead_date"],
"has_order": info["has_order"],
"order_date": info["order_date"],
"amount_val": info["amount"],
})
# A2: 小红书留资未成交90dA1子集K≠是 且 进线≥90天
a2_rows = []
a2_no_phone = 0
for row in a1_rows:
if row["has_order"]:
continue
if row["lead_date"] > cutoff_90d:
continue
a2_rows.append(row)
# A3: 小红书成单K=是 或 O=1取最近一单
# 先收集所有成单用户
paid_map = {} # account_id -> info
for aid, info in lead_map.items():
if info["has_order"] and info["phone"] and info["order_date"]:
if aid not in paid_map or info["order_date"] > paid_map[aid]["order_date"]:
paid_map[aid] = {
"phone": info["phone"],
"event_time": info["order_date"].strftime("%Y-%m-%d"),
"behavior": "购买",
"channel": "小红书",
"amount": str(info["amount"]) if info["amount"] else "",
"extra": f"uid={aid}",
"account_id": aid,
"order_date": info["order_date"],
"amount_val": info["amount"],
}
a3_rows = list(paid_map.values())
print(f"\n=== 各包统计 ===")
print(f"A1 留资: {len(a1_rows)} 行 (无手机号跳过: {a1_no_phone})")
print(f"A2 未成交90d: {len(a2_rows)}")
print(f"A3 成单: {len(a3_rows)}")
# ── 第五步写明文CSV ──
plaintext_header = ["手机号", "行为时间", "行为类型", "样本渠道", "实付金额", "额外信息"]
for name, rows in [("A1_wala_lead_xhs_202509-20260603", a1_rows),
("A2_wala_lead_xhs_no_order_90d", a2_rows),
("A3_wala_paid_xhs_202509-20260603", a3_rows)]:
plaintext_path = os.path.join(OUTPUT_DIR, f"plaintext_{name}.csv")
with open(plaintext_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(plaintext_header)
for row in rows:
writer.writerow([
row["phone"],
row["event_time"],
row["behavior"],
row["channel"],
row["amount"],
row["extra"],
])
print(f"明文CSV: {plaintext_path} ({len(rows)} 行)")
# ── 第六步:跑 phone_encrypt.py 生成上传CSV ──
upload_header = ["用户ID必填", "行为时间(选填)", "行为类型(选填)", "样本渠道(选填)", "实付金额(选填)", "额外信息(选填)"]
for name, rows in [("A1_wala_lead_xhs_202509-20260603", a1_rows),
("A2_wala_lead_xhs_no_order_90d", a2_rows),
("A3_wala_paid_xhs_202509-20260603", a3_rows)]:
upload_path = os.path.join(OUTPUT_DIR, f"{name}.csv")
with open(upload_path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow(upload_header)
for row in rows:
user_id_md5 = phone_md5(row["phone"])
writer.writerow([
user_id_md5,
row["event_time"],
row["behavior"],
row["channel"],
row["amount"],
row["extra"],
])
print(f"上传CSV: {upload_path} ({len(rows)} 行)")
# ── 汇总 ──
print(f"\n=== 导出完成 ===")
print(f"A1_wala_lead_xhs_202509-20260603.csv: {len(a1_rows)}")
print(f"A2_wala_lead_xhs_no_order_90d.csv: {len(a2_rows)}")
print(f"A3_wala_paid_xhs_202509-20260603.csv: {len(a3_rows)}")
print(f"输出目录: {OUTPUT_DIR}")
if __name__ == "__main__":
main()