359 lines
13 KiB
Python
359 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
小红书一方回传 · A档三包导出
|
||
数据源: Bot销转表三页(小龙qJF4I + 吴迪f975f0 + 成都qJF4J)
|
||
输出: plaintext CSV → phone_encrypt.py → 上传CSV
|
||
"""
|
||
import json, requests, os, sys, csv
|
||
from datetime import datetime, date, timedelta
|
||
from collections import defaultdict
|
||
|
||
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
sys.path.insert(0, SCRIPTS_DIR)
|
||
from phone_encrypt import phone_md5
|
||
|
||
# ── 配置 ──
|
||
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
|
||
SHEET_IDS = {"小龙": "qJF4I", "吴迪": "f975f0", "成都": "qJF4J"}
|
||
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
|
||
OUTPUT_DIR = os.path.join(os.path.dirname(SCRIPTS_DIR), "output", "yifang_export")
|
||
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
||
|
||
# 列索引 (0-based, A=0)
|
||
COL_C = 2 # 进线日期
|
||
COL_E = 4 # 手机号
|
||
COL_H = 7 # 用户ID (account_id)
|
||
COL_K = 10 # 是否下单
|
||
COL_L = 11 # 下单日期
|
||
COL_O = 14 # 下单金额/GMV
|
||
|
||
|
||
def get_fs_token():
|
||
with open(os.path.join(CRED_DIR, "config.json")) as f:
|
||
cfg = json.load(f)
|
||
resp = requests.post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
|
||
timeout=15
|
||
)
|
||
return resp.json()["tenant_access_token"]
|
||
|
||
|
||
def read_sheet(token, sheet_id):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
|
||
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
|
||
data = resp.json()
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(f"读取Sheet {sheet_id} 失败: {data}")
|
||
return data["data"]["valueRange"]["values"]
|
||
|
||
|
||
def parse_date(val):
|
||
"""解析日期字符串,支持多种格式,包括中文 'M月D日'"""
|
||
if not val:
|
||
return None
|
||
val = str(val).strip()
|
||
if not val:
|
||
return None
|
||
# 标准格式
|
||
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d", "%Y%m%d"]:
|
||
try:
|
||
return datetime.strptime(val, fmt).date()
|
||
except ValueError:
|
||
continue
|
||
# 只取前10位
|
||
try:
|
||
return datetime.strptime(val[:10], "%Y-%m-%d").date()
|
||
except ValueError:
|
||
pass
|
||
# 中文格式 "M月D日" — 根据月份推断年份
|
||
import re
|
||
m = re.match(r"(\d{1,2})月(\d{1,2})日", val)
|
||
if m:
|
||
month, day = int(m.group(1)), int(m.group(2))
|
||
now = datetime.now()
|
||
year = now.year if month <= now.month else now.year - 1
|
||
try:
|
||
return date(year, month, day)
|
||
except ValueError:
|
||
return None
|
||
return None
|
||
|
||
|
||
def parse_amount(val):
|
||
"""解析金额,返回float"""
|
||
if not val:
|
||
return None
|
||
val = str(val).strip().replace("¥", "").replace(",", "").replace(" ", "")
|
||
if not val:
|
||
return None
|
||
try:
|
||
return float(val)
|
||
except ValueError:
|
||
return None
|
||
|
||
|
||
def clean_phone(val):
|
||
"""清洗手机号,返回11位数字字符串"""
|
||
if not val:
|
||
return None
|
||
val = str(val).strip().replace(" ", "").replace("-", "").replace("'", "")
|
||
# 处理科学计数法
|
||
if "e" in val.lower() or "E" in val:
|
||
try:
|
||
val = str(int(float(val)))
|
||
except:
|
||
return None
|
||
# 只保留数字
|
||
digits = "".join(c for c in val if c.isdigit())
|
||
if len(digits) == 11 and digits.startswith("1"):
|
||
return digits
|
||
return None
|
||
|
||
|
||
def main():
|
||
token = get_fs_token()
|
||
print(f"飞书Token获取成功")
|
||
|
||
# ── 第一步:读取三页数据 ──
|
||
all_rows = [] # [(sales_name, row_data)]
|
||
for name, sheet_id in SHEET_IDS.items():
|
||
print(f"读取 {name}({sheet_id})...")
|
||
rows = read_sheet(token, sheet_id)
|
||
# 跳过第1行(表头)和第2行(标注行),从第3行开始
|
||
data_rows = rows[2:] if len(rows) > 2 else []
|
||
for row in data_rows:
|
||
all_rows.append((name, row))
|
||
print(f" {name}: {len(data_rows)} 行数据")
|
||
|
||
print(f"\n三页合计: {len(all_rows)} 行")
|
||
|
||
# ── 第二步:解析并去重 ──
|
||
# A1: 按 account_id 去重,取最早进线日期
|
||
# 同时记录手机号(优先E列,E空用H列后续反查)
|
||
lead_map = {} # account_id -> {phone, lead_date, has_order, order_date, amount, sales_name}
|
||
|
||
for sales_name, row in all_rows:
|
||
# 读取各列
|
||
lead_date_str = row[COL_C] if len(row) > COL_C else None
|
||
phone_raw = row[COL_E] if len(row) > COL_E else None
|
||
uid_raw = row[COL_H] if len(row) > COL_H else None
|
||
is_order = row[COL_K] if len(row) > COL_K else None
|
||
order_date_str = row[COL_L] if len(row) > COL_L else None
|
||
amount_raw = row[COL_O] if len(row) > COL_O else None
|
||
|
||
# 解析
|
||
lead_date = parse_date(lead_date_str)
|
||
phone = clean_phone(phone_raw)
|
||
uid = str(uid_raw).strip() if uid_raw else None
|
||
|
||
# 跳过无效行:没有进线日期且没有UID
|
||
if not lead_date and not uid:
|
||
continue
|
||
|
||
# 过滤:进线日期 >= 2025-09-01
|
||
if lead_date and lead_date < date(2025, 9, 1):
|
||
continue
|
||
|
||
# 确定 account_id
|
||
account_id = uid if uid and uid != "None" and uid != "" else None
|
||
if not account_id:
|
||
continue # 没有UID无法关联
|
||
|
||
# 判断是否下单
|
||
has_order = False
|
||
if is_order and str(is_order).strip() in ("是", "1", "yes", "Yes", "YES", "TRUE", "true"):
|
||
has_order = True
|
||
|
||
order_date = parse_date(order_date_str)
|
||
amount = parse_amount(amount_raw)
|
||
|
||
# 去重:同一 account_id 保留最早进线日期
|
||
if account_id not in lead_map:
|
||
lead_map[account_id] = {
|
||
"phone": phone,
|
||
"lead_date": lead_date,
|
||
"has_order": has_order,
|
||
"order_date": order_date,
|
||
"amount": amount,
|
||
"sales_name": sales_name,
|
||
}
|
||
else:
|
||
existing = lead_map[account_id]
|
||
# 取最早进线日期
|
||
if lead_date and (not existing["lead_date"] or lead_date < existing["lead_date"]):
|
||
existing["lead_date"] = lead_date
|
||
# 如果之前没手机号,用新的
|
||
if phone and not existing["phone"]:
|
||
existing["phone"] = phone
|
||
# 如果之前没标记下单,用新的
|
||
if has_order and not existing["has_order"]:
|
||
existing["has_order"] = True
|
||
existing["order_date"] = order_date
|
||
existing["amount"] = amount
|
||
# 如果有多个订单,保留最近的
|
||
if has_order and order_date and existing["order_date"]:
|
||
if order_date > existing["order_date"]:
|
||
existing["order_date"] = order_date
|
||
existing["amount"] = amount
|
||
|
||
print(f"\n去重后留资用户: {len(lead_map)} 人")
|
||
|
||
# ── 第三步:E列为空的,用H列反查数据库手机号 ──
|
||
no_phone_uids = [aid for aid, info in lead_map.items() if not info["phone"]]
|
||
print(f"E列无手机号需反查: {len(no_phone_uids)} 人")
|
||
|
||
if no_phone_uids:
|
||
import psycopg2
|
||
secrets_path = os.path.join(SCRIPTS_DIR, "..", "secrets.env")
|
||
with open(secrets_path) as f:
|
||
pg_pass = None
|
||
for line in f:
|
||
if line.startswith("PG_ONLINE_PASSWORD="):
|
||
pg_pass = line.strip().split("=", 1)[1].strip("'\"")
|
||
|
||
conn = psycopg2.connect(
|
||
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
|
||
port=28591,
|
||
user="ai_member",
|
||
password=pg_pass,
|
||
dbname="vala_bi"
|
||
)
|
||
cur = conn.cursor()
|
||
# 分批查询
|
||
batch_size = 500
|
||
for i in range(0, len(no_phone_uids), batch_size):
|
||
batch = no_phone_uids[i:i+batch_size]
|
||
placeholders = ",".join(["%s"] * len(batch))
|
||
cur.execute(
|
||
f"SELECT id::text, tel FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status=1 AND deleted_at IS NULL",
|
||
batch
|
||
)
|
||
for row in cur.fetchall():
|
||
aid, tel = row
|
||
phone = clean_phone(tel)
|
||
if phone and aid in lead_map:
|
||
lead_map[aid]["phone"] = phone
|
||
cur.close()
|
||
conn.close()
|
||
|
||
still_no_phone = sum(1 for info in lead_map.values() if not info["phone"])
|
||
print(f"反查后仍无手机号: {still_no_phone} 人")
|
||
|
||
# ── 第四步:生成 A1/A2/A3 ──
|
||
today = date.today()
|
||
cutoff_90d = today - timedelta(days=90)
|
||
|
||
# A1: 小红书留资(全部)
|
||
a1_rows = []
|
||
a1_no_phone = 0
|
||
for aid, info in lead_map.items():
|
||
if not info["lead_date"]:
|
||
continue
|
||
phone = info["phone"]
|
||
if not phone:
|
||
a1_no_phone += 1
|
||
continue
|
||
a1_rows.append({
|
||
"phone": phone,
|
||
"event_time": info["lead_date"].strftime("%Y-%m-%d"),
|
||
"behavior": "留资",
|
||
"channel": "小红书",
|
||
"amount": "",
|
||
"extra": f"uid={aid}",
|
||
"account_id": aid,
|
||
"lead_date": info["lead_date"],
|
||
"has_order": info["has_order"],
|
||
"order_date": info["order_date"],
|
||
"amount_val": info["amount"],
|
||
})
|
||
|
||
# A2: 小红书留资未成交90d(A1子集,K≠是 且 进线≥90天)
|
||
a2_rows = []
|
||
a2_no_phone = 0
|
||
for row in a1_rows:
|
||
if row["has_order"]:
|
||
continue
|
||
if row["lead_date"] > cutoff_90d:
|
||
continue
|
||
a2_rows.append(row)
|
||
|
||
# A3: 小红书成单(K=是 或 O=1,取最近一单)
|
||
# 先收集所有成单用户
|
||
paid_map = {} # account_id -> info
|
||
for aid, info in lead_map.items():
|
||
if info["has_order"] and info["phone"] and info["order_date"]:
|
||
if aid not in paid_map or info["order_date"] > paid_map[aid]["order_date"]:
|
||
paid_map[aid] = {
|
||
"phone": info["phone"],
|
||
"event_time": info["order_date"].strftime("%Y-%m-%d"),
|
||
"behavior": "购买",
|
||
"channel": "小红书",
|
||
"amount": str(info["amount"]) if info["amount"] else "",
|
||
"extra": f"uid={aid}",
|
||
"account_id": aid,
|
||
"order_date": info["order_date"],
|
||
"amount_val": info["amount"],
|
||
}
|
||
|
||
a3_rows = list(paid_map.values())
|
||
|
||
print(f"\n=== 各包统计 ===")
|
||
print(f"A1 留资: {len(a1_rows)} 行 (无手机号跳过: {a1_no_phone})")
|
||
print(f"A2 未成交90d: {len(a2_rows)} 行")
|
||
print(f"A3 成单: {len(a3_rows)} 行")
|
||
|
||
# ── 第五步:写明文CSV ──
|
||
plaintext_header = ["手机号", "行为时间", "行为类型", "样本渠道", "实付金额", "额外信息"]
|
||
|
||
for name, rows in [("A1_wala_lead_xhs_202509-20260603", a1_rows),
|
||
("A2_wala_lead_xhs_no_order_90d", a2_rows),
|
||
("A3_wala_paid_xhs_202509-20260603", a3_rows)]:
|
||
plaintext_path = os.path.join(OUTPUT_DIR, f"plaintext_{name}.csv")
|
||
with open(plaintext_path, "w", newline="", encoding="utf-8-sig") as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(plaintext_header)
|
||
for row in rows:
|
||
writer.writerow([
|
||
row["phone"],
|
||
row["event_time"],
|
||
row["behavior"],
|
||
row["channel"],
|
||
row["amount"],
|
||
row["extra"],
|
||
])
|
||
print(f"明文CSV: {plaintext_path} ({len(rows)} 行)")
|
||
|
||
# ── 第六步:跑 phone_encrypt.py 生成上传CSV ──
|
||
upload_header = ["用户ID(必填)", "行为时间(选填)", "行为类型(选填)", "样本渠道(选填)", "实付金额(选填)", "额外信息(选填)"]
|
||
|
||
for name, rows in [("A1_wala_lead_xhs_202509-20260603", a1_rows),
|
||
("A2_wala_lead_xhs_no_order_90d", a2_rows),
|
||
("A3_wala_paid_xhs_202509-20260603", a3_rows)]:
|
||
upload_path = os.path.join(OUTPUT_DIR, f"{name}.csv")
|
||
with open(upload_path, "w", newline="", encoding="utf-8-sig") as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(upload_header)
|
||
for row in rows:
|
||
user_id_md5 = phone_md5(row["phone"])
|
||
writer.writerow([
|
||
user_id_md5,
|
||
row["event_time"],
|
||
row["behavior"],
|
||
row["channel"],
|
||
row["amount"],
|
||
row["extra"],
|
||
])
|
||
print(f"上传CSV: {upload_path} ({len(rows)} 行)")
|
||
|
||
# ── 汇总 ──
|
||
print(f"\n=== 导出完成 ===")
|
||
print(f"A1_wala_lead_xhs_202509-20260603.csv: {len(a1_rows)} 行")
|
||
print(f"A2_wala_lead_xhs_no_order_90d.csv: {len(a2_rows)} 行")
|
||
print(f"A3_wala_paid_xhs_202509-20260603.csv: {len(a3_rows)} 行")
|
||
print(f"输出目录: {OUTPUT_DIR}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|