ai_member_xiaoxi/scripts/xhs_lead_lag_analysis.py
2026-06-07 08:00:01 +08:00

548 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
小红书线索进线×成单 lag 表分析
数据源xiaoxi_xhs_lead_detail.csv陈逸鸫提供
匹配手机号→bi_vala_app_account.tel备UID→bi_vala_app_account.id
成单口径bi_vala_order order_status IN (3,4), pay_success_date IS NOT NULL
"""
import csv
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from phone_encrypt import encrypt_phone
import psycopg2
import psycopg2.extras
from collections import defaultdict
from datetime import datetime, date
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
# ── Database connection ──
PG_CONFIG = {
"host": "bj-postgres-16pob4sg.sql.tencentcdb.com",
"port": 28591,
"user": "ai_member",
"password": "LdfjdjL83h3h3^$&**YGG*",
"dbname": "vala_bi",
}
def get_conn():
return psycopg2.connect(**PG_CONFIG)
# ── Step 1: Load CSV ──
print("=" * 60)
print("Step 1: Loading CSV...")
leads = []
with open("/root/.openclaw/workspace/tmp/xiaoxi_xhs_lead_detail.csv", "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for row in reader:
lead_month = row.get("进线月", "").strip()
if lead_month in ("2026-03", "2026-04", "2026-05"):
leads.append(row)
print(f" 3-5月进线线索总数: {len(leads)}")
# Extract unique phones and UIDs
phones = set()
uids = set()
for r in leads:
phone = r.get("手机号", "").strip()
uid = r.get("用户ID", "").strip()
if phone and phone != "0":
phones.add(phone)
if uid and uid != "0" and uid.isdigit():
uids.add(int(uid))
print(f" 唯一手机号: {len(phones)}, 唯一UID: {len(uids)}")
# ── Step 2: Match phone → account_id ──
print("\nStep 2: Matching phone → bi_vala_app_account...")
conn = get_conn()
cur = conn.cursor()
phone_to_account = {}
if phones:
# XXTEA encrypt phones and match against tel_encrypt
phone_encrypt_map = {}
for p in phones:
try:
phone_encrypt_map[encrypt_phone(p)] = p
except Exception as e:
pass
encrypted_list = list(phone_encrypt_map.keys())
batch_size = 500
for i in range(0, len(encrypted_list), batch_size):
batch = encrypted_list[i:i+batch_size]
placeholders = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT tel_encrypt, id FROM bi_vala_app_account WHERE tel_encrypt IN ({placeholders}) AND status = 1 AND deleted_at IS NULL",
batch
)
for tel_enc, aid in cur.fetchall():
original_phone = phone_encrypt_map.get(tel_enc)
if original_phone:
phone_to_account[original_phone] = aid
print(f" 手机号匹配到账号: {len(phone_to_account)}/{len(phones)}")
# ── Step 3: Match UID → account_id ──
print("\nStep 3: Matching UID → bi_vala_app_account...")
uid_to_account = {}
if uids:
uid_list = list(uids)
for i in range(0, len(uid_list), batch_size):
batch = uid_list[i:i+batch_size]
placeholders = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT id FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status = 1 AND deleted_at IS NULL",
batch
)
for (aid,) in cur.fetchall():
uid_to_account[aid] = aid
print(f" UID匹配到有效账号: {len(uid_to_account)}/{len(uids)}")
# ── Step 4: Build lead → account_id mapping ──
print("\nStep 4: Building lead→account mapping...")
lead_account_map = {} # lead_row_index → account_id
matched_by_phone = 0
matched_by_uid = 0
unmatched = 0
for idx, r in enumerate(leads):
phone = r.get("手机号", "").strip()
uid = r.get("用户ID", "").strip()
aid = None
match_method = None
# Try phone first
if phone and phone in phone_to_account:
aid = phone_to_account[phone]
match_method = "phone"
matched_by_phone += 1
# Fallback to UID
elif uid and uid.isdigit() and int(uid) in uid_to_account:
aid = int(uid)
match_method = "uid"
matched_by_uid += 1
else:
unmatched += 1
lead_account_map[idx] = {"account_id": aid, "match_method": match_method}
print(f" 手机号匹配: {matched_by_phone}, UID匹配: {matched_by_uid}, 未匹配: {unmatched}")
print(f" 总匹配率: {(matched_by_phone + matched_by_uid) / len(leads) * 100:.1f}%")
# ── Step 5: Get all orders for matched accounts ──
print("\nStep 5: Fetching orders for matched accounts...")
matched_aids = set(
v["account_id"] for v in lead_account_map.values() if v["account_id"] is not None
)
print(f" 去重匹配账号数: {len(matched_aids)}")
# Get orders with order_status 3 or 4, pay_success_date not null
# Also join refund info
aid_list = list(matched_aids)
account_orders = defaultdict(list) # account_id → list of order dicts
for i in range(0, len(aid_list), batch_size):
batch = aid_list[i:i+batch_size]
placeholders = ",".join(["%s"] * len(batch))
cur.execute(f"""
SELECT
o.account_id,
o.id as order_id,
o.pay_success_date,
o.pay_amount_int,
o.order_status,
o.key_from,
o.trade_no,
o.out_trade_no,
r.refund_amount_int,
r.status as refund_status
FROM bi_vala_order o
LEFT JOIN bi_refund_order r ON (
(o.trade_no = r.trade_no OR o.out_trade_no = r.out_trade_no)
AND r.status = 3
)
WHERE o.account_id IN ({placeholders})
AND o.pay_success_date IS NOT NULL
AND o.order_status IN (3, 4)
ORDER BY o.account_id, o.pay_success_date
""", batch)
for row in cur.fetchall():
aid, oid, psd, amt, ost, kf, tn, otn, ref_amt, ref_st = row
account_orders[aid].append({
"order_id": oid,
"pay_success_date": psd,
"pay_amount_int": amt,
"order_status": ost,
"key_from": kf,
"trade_no": tn,
"out_trade_no": otn,
"refund_amount_int": ref_amt or 0,
"refund_status": ref_st,
})
total_orders = sum(len(v) for v in account_orders.values())
accounts_with_orders = len(account_orders)
print(f" 有订单的账号数: {accounts_with_orders}, 总订单数: {total_orders}")
cur.close()
conn.close()
# ── Step 6: Build lag table ──
print("\nStep 6: Building lag table...")
# For each lead, determine lead_month and find orders
# lag = (order_month - lead_month) in months
def month_diff(d1, d2):
"""Months between two dates: d2 - d1"""
return (d2.year - d1.year) * 12 + (d2.month - d1.month)
# Aggregate: lead_month × lag_month
# For each lead_month, count leads, and for each lag bucket count orders
lag_data = defaultdict(lambda: defaultdict(lambda: {
"lead_count": 0,
"order_count": 0,
"order_accounts": set(),
"gmv": 0,
"refund": 0,
"gsv": 0,
}))
# Also track per-lead details for debugging
lead_details = []
for idx, r in enumerate(leads):
lead_month = r.get("进线月", "").strip()
lead_date_str = r.get("进线日期", "").strip()
info = lead_account_map[idx]
aid = info["account_id"]
# Count lead
lag_data[lead_month]["total"]["lead_count"] += 1
if aid is None:
lead_details.append({
"lead_month": lead_month,
"lead_date": lead_date_str,
"phone": r.get("手机号", "").strip(),
"uid": r.get("用户ID", "").strip(),
"account_id": None,
"matched": False,
"match_method": "",
"has_order": False,
"order_month": None,
"lag": None,
"gmv": 0,
"refund": 0,
"gsv": 0,
})
continue
# Parse lead date
try:
lead_dt = datetime.strptime(lead_date_str, "%Y-%m-%d").date()
except:
lead_dt = datetime.strptime(lead_month + "-01", "%Y-%m-%d").date()
orders = account_orders.get(aid, [])
if not orders:
lead_details.append({
"lead_month": lead_month,
"lead_date": lead_date_str,
"phone": r.get("手机号", "").strip(),
"uid": r.get("用户ID", "").strip(),
"account_id": aid,
"matched": True,
"match_method": info.get("match_method", "") or "",
"has_order": False,
"order_month": None,
"lag": None,
"gmv": 0,
"refund": 0,
"gsv": 0,
})
continue
# For each order, determine lag
has_any_order = False
for order in orders:
psd = order["pay_success_date"]
if isinstance(psd, datetime):
order_dt = psd.date()
else:
order_dt = psd
lag = month_diff(lead_dt, order_dt)
# Only count orders at or after lead month (lag >= 0)
if lag < 0:
continue
has_any_order = True
order_month = order_dt.strftime("%Y-%m")
lag_key = f"M{lag}"
lag_data[lead_month][lag_key]["order_count"] += 1
lag_data[lead_month][lag_key]["order_accounts"].add(aid)
lag_data[lead_month][lag_key]["gmv"] += order["pay_amount_int"]
lag_data[lead_month][lag_key]["refund"] += order["refund_amount_int"]
lag_data[lead_month][lag_key]["gsv"] += (order["pay_amount_int"] - order["refund_amount_int"])
# Record first order for lead detail
first_order = min(
[o for o in orders if month_diff(lead_dt, o["pay_success_date"] if isinstance(o["pay_success_date"], datetime) else o["pay_success_date"]) >= 0],
key=lambda o: o["pay_success_date"],
default=None
) if orders else None
if first_order:
psd = first_order["pay_success_date"]
order_dt = psd.date() if isinstance(psd, datetime) else psd
first_lag = month_diff(lead_dt, order_dt)
lead_details.append({
"lead_month": lead_month,
"lead_date": lead_date_str,
"phone": r.get("手机号", "").strip(),
"uid": r.get("用户ID", "").strip(),
"account_id": aid,
"matched": True,
"match_method": info.get("match_method", "") or "",
"has_order": True,
"order_month": order_dt.strftime("%Y-%m"),
"lag": first_lag,
"gmv": first_order["pay_amount_int"] / 100,
"refund": first_order["refund_amount_int"] / 100,
"gsv": (first_order["pay_amount_int"] - first_order["refund_amount_int"]) / 100,
})
else:
lead_details.append({
"lead_month": lead_month,
"lead_date": lead_date_str,
"phone": r.get("手机号", "").strip(),
"uid": r.get("用户ID", "").strip(),
"account_id": aid,
"matched": True,
"match_method": info.get("match_method", "") or "",
"has_order": False,
"order_month": None,
"lag": None,
"gmv": 0,
"refund": 0,
"gsv": 0,
})
# ── Step 7: Print summary ──
print("\n" + "=" * 60)
print("Lag Table Summary")
print("=" * 60)
for lead_month in sorted(lag_data.keys()):
print(f"\n── 进线月: {lead_month} ──")
total_leads = lag_data[lead_month]["total"]["lead_count"]
print(f" 留资总数: {total_leads}")
all_lag_keys = sorted(
[k for k in lag_data[lead_month].keys() if k.startswith("M")],
key=lambda x: int(x[1:])
)
cum_orders = 0
cum_gmv = 0
cum_refund = 0
for lag_key in all_lag_keys:
d = lag_data[lead_month][lag_key]
cum_orders += d["order_count"]
cum_gmv += d["gmv"]
cum_refund += d["refund"]
rate = d["order_count"] / total_leads * 100 if total_leads > 0 else 0
cum_rate = cum_orders / total_leads * 100 if total_leads > 0 else 0
print(f" {lag_key}: 成单{d['order_count']}"
f"({len(d['order_accounts'])}人) "
f"成单率{rate:.1f}% "
f"累计{cum_orders}单({cum_rate:.1f}%) "
f"GMV¥{d['gmv']/100:,.0f} "
f"退¥{d['refund']/100:,.0f} "
f"GSV¥{d['gsv']/100:,.0f}")
# ── Step 8: Export to Excel ──
print("\nStep 8: Exporting to Excel...")
wb = openpyxl.Workbook()
# ── Sheet 1: Lag Summary ──
ws1 = wb.active
ws1.title = "Lag汇总表"
header_font = Font(bold=True, size=11)
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
header_font_white = Font(bold=True, size=11, color="FFFFFF")
thin_border = Border(
left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin')
)
# Title
ws1.merge_cells("A1:J1")
ws1["A1"] = "小红书线索进线×成单 Lag 表2026年3-5月"
ws1["A1"].font = Font(bold=True, size=14)
ws1["A1"].alignment = Alignment(horizontal="center")
ws1.merge_cells("A2:J2")
ws1["A2"] = "数据源xiaoxi_xhs_lead_detail.csv | 成单口径bi_vala_order order_status IN (3,4) pay_success_date NOT NULL | 测试账号已剔除"
ws1["A2"].font = Font(size=9, color="666666")
ws1["A2"].alignment = Alignment(horizontal="center")
# Headers
headers = ["进线月", "留资数", "Lag", "成单数", "成单人数", "成单率%", "累计成单数", "累计成单率%", "GMV(元)", "退款(元)", "GSV(元)"]
for col, h in enumerate(headers, 1):
cell = ws1.cell(row=4, column=col, value=h)
cell.font = header_font_white
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
cell.border = thin_border
row = 5
for lead_month in sorted(lag_data.keys()):
total_leads = lag_data[lead_month]["total"]["lead_count"]
all_lag_keys = sorted(
[k for k in lag_data[lead_month].keys() if k.startswith("M")],
key=lambda x: int(x[1:])
)
cum_orders = 0
# First row for this lead_month
for li, lag_key in enumerate(all_lag_keys):
d = lag_data[lead_month][lag_key]
cum_orders += d["order_count"]
rate = d["order_count"] / total_leads * 100 if total_leads > 0 else 0
cum_rate = cum_orders / total_leads * 100 if total_leads > 0 else 0
values = [
lead_month if li == 0 else "",
total_leads if li == 0 else "",
lag_key,
d["order_count"],
len(d["order_accounts"]),
round(rate, 1),
cum_orders,
round(cum_rate, 1),
round(d["gmv"] / 100, 2),
round(d["refund"] / 100, 2),
round(d["gsv"] / 100, 2),
]
for col, v in enumerate(values, 1):
cell = ws1.cell(row=row, column=col, value=v)
cell.border = thin_border
cell.alignment = Alignment(horizontal="center")
row += 1
# Total row for this lead_month
total_orders = cum_orders
total_rate = total_orders / total_leads * 100 if total_leads > 0 else 0
total_gmv = sum(lag_data[lead_month][k]["gmv"] for k in all_lag_keys)
total_refund = sum(lag_data[lead_month][k]["refund"] for k in all_lag_keys)
total_gsv = sum(lag_data[lead_month][k]["gsv"] for k in all_lag_keys)
total_values = [
f"{lead_month} 合计",
total_leads,
"全部",
total_orders,
len(set().union(*[lag_data[lead_month][k]["order_accounts"] for k in all_lag_keys])),
round(total_rate, 1),
"",
"",
round(total_gmv / 100, 2),
round(total_refund / 100, 2),
round(total_gsv / 100, 2),
]
for col, v in enumerate(total_values, 1):
cell = ws1.cell(row=row, column=col, value=v)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="D9E2F3", end_color="D9E2F3", fill_type="solid")
cell.border = thin_border
cell.alignment = Alignment(horizontal="center")
row += 1
row += 1 # blank row
# Adjust column widths
for col in range(1, 12):
ws1.column_dimensions[openpyxl.utils.get_column_letter(col)].width = 14
# ── Sheet 2: Lead Detail ──
ws2 = wb.create_sheet("线索明细")
detail_headers = ["进线月", "进线日期", "手机号", "用户ID", "匹配账号ID", "匹配方式", "是否成单", "首单月份", "Lag月数", "GMV(元)", "退款(元)", "GSV(元)"]
for col, h in enumerate(detail_headers, 1):
cell = ws2.cell(row=1, column=col, value=h)
cell.font = header_font_white
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
cell.border = thin_border
for i, ld in enumerate(lead_details, 2):
match_method = ld.get("match_method", "") or ""
values = [
ld["lead_month"],
ld["lead_date"],
ld["phone"],
ld["uid"],
ld["account_id"] or "",
match_method,
"" if ld["has_order"] else "",
ld["order_month"] or "",
ld["lag"] if ld["lag"] is not None else "",
ld["gmv"],
ld["refund"],
ld["gsv"],
]
for col, v in enumerate(values, 1):
cell = ws2.cell(row=i, column=col, value=v)
cell.border = thin_border
cell.alignment = Alignment(horizontal="center")
for col in range(1, 13):
ws2.column_dimensions[openpyxl.utils.get_column_letter(col)].width = 14
# ── Sheet 3: Match Stats ──
ws3 = wb.create_sheet("匹配统计")
stats = [
["指标", "数值"],
["3-5月进线线索总数", len(leads)],
["手机号匹配成功", matched_by_phone],
["UID匹配成功", matched_by_uid],
["未匹配", unmatched],
["匹配率", f"{(matched_by_phone + matched_by_uid) / len(leads) * 100:.1f}%"],
["匹配到去重账号数", len(matched_aids)],
["有订单的账号数", accounts_with_orders],
["总订单数", total_orders],
]
for i, (k, v) in enumerate(stats, 1):
ws3.cell(row=i, column=1, value=k).font = Font(bold=True)
ws3.cell(row=i, column=2, value=v)
ws3.cell(row=i, column=1).border = thin_border
ws3.cell(row=i, column=2).border = thin_border
ws3.column_dimensions["A"].width = 25
ws3.column_dimensions["B"].width = 20
# Save
output_path = "/root/.openclaw/workspace/output/xhs_lead_lag_analysis_202603-202605.xlsx"
wb.save(output_path)
print(f"\n✅ 输出文件: {output_path}")
print("Done!")