ai_member_xiaoxi/scripts/lingxi_review_analysis.py
2026-06-06 08:00:01 +08:00

303 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
灵犀复盘 · 一方数据分析
需求:
1) B7 全量成单 916 行重新匹配/补传
2) A1 留资 1910 vs 灵犀 1849 按进线月对齐
3) B6/B7 与 A1 重叠按进线月/成交月拆
4) 35月小红书 lead/paid 月度汇总
"""
import csv, os, sys
from collections import defaultdict
from datetime import date, datetime
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output", "yifang_export")
os.makedirs(OUTPUT_DIR, exist_ok=True)
def load_plaintext(path):
"""Load plaintext CSV, return list of dicts."""
rows = []
full_path = os.path.join(OUTPUT_DIR, path)
if not os.path.exists(full_path):
print(f" MISSING: {full_path}")
return rows
with open(full_path, encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for r in reader:
rows.append(r)
print(f" Loaded {path}: {len(rows)} rows")
return rows
def load_encrypted(path):
"""Load encrypted CSV (MD5), return list of dicts."""
rows = []
full_path = os.path.join(OUTPUT_DIR, path)
if not os.path.exists(full_path):
print(f" MISSING: {full_path}")
return rows
with open(full_path, encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for r in reader:
rows.append(r)
print(f" Loaded {path}: {len(rows)} rows")
return rows
def parse_date_safe(s):
if not s:
return None
s = str(s).strip()
for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d"]:
try:
return datetime.strptime(s, fmt).date()
except:
pass
try:
return datetime.strptime(s[:10], "%Y-%m-%d").date()
except:
pass
return None
def month_label(d):
if not d:
return "未知"
return f"{d.year}-{d.month:02d}"
# ── Load data ──
print("=" * 60)
print("加载数据...")
a1 = load_plaintext("plaintext_A1_wala_lead_xhs_202509-20260603.csv")
a3 = load_plaintext("plaintext_A3_wala_paid_xhs_202509-20260603.csv")
b6 = load_plaintext("plaintext_B6_xhs_daren_paid.csv")
b7 = load_plaintext("plaintext_xhs_all_paid.csv")
overlap = load_plaintext("B_vs_A1_overlap.csv")
# Parse dates
for rows in [a1, a3, b6, b7]:
for r in rows:
r["_date"] = parse_date_safe(r.get("行为时间", ""))
# ── 1) B7 全量成单 重新匹配 ──
print("\n" + "=" * 60)
print("1) B7 小红书全量成单 重新匹配")
print(f" 当前 B7: {len(b7)}")
# Build A1 phone->uid map
a1_phone_map = {}
for r in a1:
phone = r.get("手机号", "").strip()
if phone:
a1_phone_map[phone] = r
# Build A3 phone->uid map
a3_phone_map = {}
for r in a3:
phone = r.get("手机号", "").strip()
if phone:
a3_phone_map[phone] = r
# Match B7 phones to A1/A3
b7_matched_a1 = 0
b7_matched_a3 = 0
b7_unmatched = 0
for r in b7:
phone = r.get("手机号", "").strip()
if phone in a1_phone_map:
b7_matched_a1 += 1
if phone in a3_phone_map:
b7_matched_a3 += 1
if phone not in a1_phone_map and phone not in a3_phone_map:
b7_unmatched += 1
print(f" B7 匹配 A1 留资: {b7_matched_a1}/{len(b7)}")
print(f" B7 匹配 A3 成单: {b7_matched_a3}/{len(b7)}")
print(f" B7 未匹配: {b7_unmatched}/{len(b7)}")
# ── 2) A1 留资 1910 vs 灵犀 1849 按进线月对齐 ──
print("\n" + "=" * 60)
print("2) A1 留资 1910 vs 灵犀匹配 1849 按进线月对齐")
# A1 by lead month
a1_by_month = defaultdict(int)
for r in a1:
d = r["_date"]
if d:
a1_by_month[month_label(d)] += 1
else:
a1_by_month["未知"] += 1
print(f"\n A1 本地留资: {len(a1)}")
print(f" 灵犀匹配: 1849 人 (差 {len(a1)-1849})")
print(f"\n 按进线月分布:")
print(f" {'月份':<10} {'A1本地':>8} {'灵犀(估)':>10} {'差异':>8}")
print(f" {'-'*40}")
# Estimate lingxi by month proportionally
total_a1 = len(a1)
lingxi_total = 1849
ratio = lingxi_total / total_a1 if total_a1 > 0 else 0
for month in sorted(a1_by_month.keys()):
if month == "未知":
continue
local = a1_by_month[month]
est_lingxi = round(local * ratio)
diff = local - est_lingxi
print(f" {month:<10} {local:>8} {est_lingxi:>10} {diff:>8}")
# ── 3) B6/B7 与 A1 重叠 ──
print("\n" + "=" * 60)
print("3) B6达人成单 / B7全量成单 与 A1 重叠分析")
# Build phone sets
a1_phones = set(r.get("手机号", "").strip() for r in a1 if r.get("手机号", "").strip())
b6_phones = set(r.get("手机号", "").strip() for r in b6 if r.get("手机号", "").strip())
b7_phones = set(r.get("手机号", "").strip() for r in b7 if r.get("手机号", "").strip())
b6_a1_overlap = b6_phones & a1_phones
b7_a1_overlap = b7_phones & a1_phones
b6_b7_overlap = b6_phones & b7_phones
all_three = b6_phones & b7_phones & a1_phones
print(f"\n 集合大小:")
print(f" A1 留资: {len(a1_phones)}")
print(f" B6 达人成单: {len(b6_phones)}")
print(f" B7 全量成单: {len(b7_phones)}")
print(f" B6 ∩ A1: {len(b6_a1_overlap)}")
print(f" B7 ∩ A1: {len(b7_a1_overlap)}")
print(f" B6 ∩ B7: {len(b6_b7_overlap)}")
print(f" B6 ∩ B7 ∩ A1: {len(all_three)}")
# B6 vs A1 overlap by lead month
print(f"\n B6 ∩ A1 按进线月:")
b6_a1_by_lead_month = defaultdict(int)
for r in a1:
phone = r.get("手机号", "").strip()
if phone in b6_a1_overlap:
d = r["_date"]
b6_a1_by_lead_month[month_label(d)] += 1
for month in sorted(b6_a1_by_lead_month.keys()):
print(f" {month}: {b6_a1_by_lead_month[month]}")
# B6 vs A1 overlap by order month (B6's date)
print(f"\n B6 ∩ A1 按成交月:")
b6_a1_by_order_month = defaultdict(int)
for r in b6:
phone = r.get("手机号", "").strip()
if phone in b6_a1_overlap:
d = r["_date"]
b6_a1_by_order_month[month_label(d)] += 1
for month in sorted(b6_a1_by_order_month.keys()):
print(f" {month}: {b6_a1_by_order_month[month]}")
# B7 vs A1 overlap by lead month
print(f"\n B7 ∩ A1 按进线月:")
b7_a1_by_lead_month = defaultdict(int)
for r in a1:
phone = r.get("手机号", "").strip()
if phone in b7_a1_overlap:
d = r["_date"]
b7_a1_by_lead_month[month_label(d)] += 1
for month in sorted(b7_a1_by_lead_month.keys()):
print(f" {month}: {b7_a1_by_lead_month[month]}")
# B7 vs A1 overlap by order month
print(f"\n B7 ∩ A1 按成交月:")
b7_a1_by_order_month = defaultdict(int)
for r in b7:
phone = r.get("手机号", "").strip()
if phone in b7_a1_overlap:
d = r["_date"]
b7_a1_by_order_month[month_label(d)] += 1
for month in sorted(b7_a1_by_order_month.keys()):
print(f" {month}: {b7_a1_by_order_month[month]}")
# ── 4) 35月小红书 lead/paid 月度汇总 ──
print("\n" + "=" * 60)
print("4) 35月小红书 lead/paid 月度汇总")
target_months = ["2026-03", "2026-04", "2026-05"]
# Lead by month (A1)
lead_by_month = defaultdict(int)
for r in a1:
d = r["_date"]
if d:
lead_by_month[month_label(d)] += 1
# Paid by month (A3)
paid_by_month = defaultdict(int)
paid_amount_by_month = defaultdict(float)
for r in a3:
d = r["_date"]
if d:
m = month_label(d)
paid_by_month[m] += 1
try:
paid_amount_by_month[m] += float(r.get("实付金额", 0) or 0)
except:
pass
# B6 paid by month
b6_paid_by_month = defaultdict(int)
b6_amount_by_month = defaultdict(float)
for r in b6:
d = r["_date"]
if d:
m = month_label(d)
b6_paid_by_month[m] += 1
try:
b6_amount_by_month[m] += float(r.get("实付金额", 0) or 0)
except:
pass
# B7 paid by month
b7_paid_by_month = defaultdict(int)
b7_amount_by_month = defaultdict(float)
for r in b7:
d = r["_date"]
if d:
m = month_label(d)
b7_paid_by_month[m] += 1
try:
b7_amount_by_month[m] += float(r.get("实付金额", 0) or 0)
except:
pass
print(f"\n {'月份':<10} {'A1留资':>8} {'A3成单':>8} {'A3金额':>12} {'B6达人成单':>10} {'B6金额':>12} {'B7全量成单':>10} {'B7金额':>12}")
print(f" {'-'*90}")
for month in target_months:
print(f" {month:<10} {lead_by_month.get(month,0):>8} {paid_by_month.get(month,0):>8} {paid_amount_by_month.get(month,0):>12,.0f} {b6_paid_by_month.get(month,0):>10} {b6_amount_by_month.get(month,0):>12,.0f} {b7_paid_by_month.get(month,0):>10} {b7_amount_by_month.get(month,0):>12,.0f}")
# Totals
print(f" {'-'*90}")
print(f" {'合计':<10} {sum(lead_by_month.get(m,0) for m in target_months):>8} {sum(paid_by_month.get(m,0) for m in target_months):>8} {sum(paid_amount_by_month.get(m,0) for m in target_months):>12,.0f} {sum(b6_paid_by_month.get(m,0) for m in target_months):>10} {sum(b6_amount_by_month.get(m,0) for m in target_months):>12,.0f} {sum(b7_paid_by_month.get(m,0) for m in target_months):>10} {sum(b7_amount_by_month.get(m,0) for m in target_months):>12,.0f}")
# Also show all months for context
print(f"\n 全量月份:")
all_months = sorted(set(list(lead_by_month.keys()) + list(paid_by_month.keys()) + list(b6_paid_by_month.keys()) + list(b7_paid_by_month.keys())))
print(f" {'月份':<10} {'A1留资':>8} {'A3成单':>8} {'B6达人':>8} {'B7全量':>8}")
for month in all_months:
print(f" {month:<10} {lead_by_month.get(month,0):>8} {paid_by_month.get(month,0):>8} {b6_paid_by_month.get(month,0):>8} {b7_paid_by_month.get(month,0):>8}")
# ── Extra: B7 dedup analysis ──
print("\n" + "=" * 60)
print("B7 去重分析")
b7_phone_counts = defaultdict(int)
for r in b7:
phone = r.get("手机号", "").strip()
if phone:
b7_phone_counts[phone] += 1
b7_dup = {p: c for p, c in b7_phone_counts.items() if c > 1}
print(f" B7 唯一手机号: {len(b7_phone_counts)}")
print(f" B7 重复手机号: {len(b7_dup)} (共 {sum(b7_dup.values())} 条记录)")
if b7_dup:
print(f" 重复样例 (前5):")
for i, (p, c) in enumerate(sorted(b7_dup.items(), key=lambda x: -x[1])[:5]):
print(f" {p}: {c}")
print("\n✅ 分析完成")