#!/usr/bin/env python3 """ 灵犀复盘 · 一方数据分析 需求: 1) B7 全量成单 916 行重新匹配/补传 2) A1 留资 1910 vs 灵犀 1849 按进线月对齐 3) B6/B7 与 A1 重叠按进线月/成交月拆 4) 3–5月小红书 lead/paid 月度汇总 """ import csv, os, sys from collections import defaultdict from datetime import date, datetime OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output", "yifang_export") os.makedirs(OUTPUT_DIR, exist_ok=True) def load_plaintext(path): """Load plaintext CSV, return list of dicts.""" rows = [] full_path = os.path.join(OUTPUT_DIR, path) if not os.path.exists(full_path): print(f" MISSING: {full_path}") return rows with open(full_path, encoding="utf-8-sig") as f: reader = csv.DictReader(f) for r in reader: rows.append(r) print(f" Loaded {path}: {len(rows)} rows") return rows def load_encrypted(path): """Load encrypted CSV (MD5), return list of dicts.""" rows = [] full_path = os.path.join(OUTPUT_DIR, path) if not os.path.exists(full_path): print(f" MISSING: {full_path}") return rows with open(full_path, encoding="utf-8-sig") as f: reader = csv.DictReader(f) for r in reader: rows.append(r) print(f" Loaded {path}: {len(rows)} rows") return rows def parse_date_safe(s): if not s: return None s = str(s).strip() for fmt in ["%Y-%m-%d", "%Y/%m/%d", "%Y.%m.%d"]: try: return datetime.strptime(s, fmt).date() except: pass try: return datetime.strptime(s[:10], "%Y-%m-%d").date() except: pass return None def month_label(d): if not d: return "未知" return f"{d.year}-{d.month:02d}" # ── Load data ── print("=" * 60) print("加载数据...") a1 = load_plaintext("plaintext_A1_wala_lead_xhs_202509-20260603.csv") a3 = load_plaintext("plaintext_A3_wala_paid_xhs_202509-20260603.csv") b6 = load_plaintext("plaintext_B6_xhs_daren_paid.csv") b7 = load_plaintext("plaintext_xhs_all_paid.csv") overlap = load_plaintext("B_vs_A1_overlap.csv") # Parse dates for rows in [a1, a3, b6, b7]: for r in rows: r["_date"] = parse_date_safe(r.get("行为时间", "")) # ── 1) B7 全量成单 重新匹配 ── print("\n" + "=" * 60) print("1) B7 小红书全量成单 重新匹配") print(f" 当前 B7: {len(b7)} 行") # Build A1 phone->uid map a1_phone_map = {} for r in a1: phone = r.get("手机号", "").strip() if phone: a1_phone_map[phone] = r # Build A3 phone->uid map a3_phone_map = {} for r in a3: phone = r.get("手机号", "").strip() if phone: a3_phone_map[phone] = r # Match B7 phones to A1/A3 b7_matched_a1 = 0 b7_matched_a3 = 0 b7_unmatched = 0 for r in b7: phone = r.get("手机号", "").strip() if phone in a1_phone_map: b7_matched_a1 += 1 if phone in a3_phone_map: b7_matched_a3 += 1 if phone not in a1_phone_map and phone not in a3_phone_map: b7_unmatched += 1 print(f" B7 匹配 A1 留资: {b7_matched_a1}/{len(b7)}") print(f" B7 匹配 A3 成单: {b7_matched_a3}/{len(b7)}") print(f" B7 未匹配: {b7_unmatched}/{len(b7)}") # ── 2) A1 留资 1910 vs 灵犀 1849 按进线月对齐 ── print("\n" + "=" * 60) print("2) A1 留资 1910 vs 灵犀匹配 1849 按进线月对齐") # A1 by lead month a1_by_month = defaultdict(int) for r in a1: d = r["_date"] if d: a1_by_month[month_label(d)] += 1 else: a1_by_month["未知"] += 1 print(f"\n A1 本地留资: {len(a1)} 人") print(f" 灵犀匹配: 1849 人 (差 {len(a1)-1849})") print(f"\n 按进线月分布:") print(f" {'月份':<10} {'A1本地':>8} {'灵犀(估)':>10} {'差异':>8}") print(f" {'-'*40}") # Estimate lingxi by month proportionally total_a1 = len(a1) lingxi_total = 1849 ratio = lingxi_total / total_a1 if total_a1 > 0 else 0 for month in sorted(a1_by_month.keys()): if month == "未知": continue local = a1_by_month[month] est_lingxi = round(local * ratio) diff = local - est_lingxi print(f" {month:<10} {local:>8} {est_lingxi:>10} {diff:>8}") # ── 3) B6/B7 与 A1 重叠 ── print("\n" + "=" * 60) print("3) B6达人成单 / B7全量成单 与 A1 重叠分析") # Build phone sets a1_phones = set(r.get("手机号", "").strip() for r in a1 if r.get("手机号", "").strip()) b6_phones = set(r.get("手机号", "").strip() for r in b6 if r.get("手机号", "").strip()) b7_phones = set(r.get("手机号", "").strip() for r in b7 if r.get("手机号", "").strip()) b6_a1_overlap = b6_phones & a1_phones b7_a1_overlap = b7_phones & a1_phones b6_b7_overlap = b6_phones & b7_phones all_three = b6_phones & b7_phones & a1_phones print(f"\n 集合大小:") print(f" A1 留资: {len(a1_phones)}") print(f" B6 达人成单: {len(b6_phones)}") print(f" B7 全量成单: {len(b7_phones)}") print(f" B6 ∩ A1: {len(b6_a1_overlap)}") print(f" B7 ∩ A1: {len(b7_a1_overlap)}") print(f" B6 ∩ B7: {len(b6_b7_overlap)}") print(f" B6 ∩ B7 ∩ A1: {len(all_three)}") # B6 vs A1 overlap by lead month print(f"\n B6 ∩ A1 按进线月:") b6_a1_by_lead_month = defaultdict(int) for r in a1: phone = r.get("手机号", "").strip() if phone in b6_a1_overlap: d = r["_date"] b6_a1_by_lead_month[month_label(d)] += 1 for month in sorted(b6_a1_by_lead_month.keys()): print(f" {month}: {b6_a1_by_lead_month[month]}") # B6 vs A1 overlap by order month (B6's date) print(f"\n B6 ∩ A1 按成交月:") b6_a1_by_order_month = defaultdict(int) for r in b6: phone = r.get("手机号", "").strip() if phone in b6_a1_overlap: d = r["_date"] b6_a1_by_order_month[month_label(d)] += 1 for month in sorted(b6_a1_by_order_month.keys()): print(f" {month}: {b6_a1_by_order_month[month]}") # B7 vs A1 overlap by lead month print(f"\n B7 ∩ A1 按进线月:") b7_a1_by_lead_month = defaultdict(int) for r in a1: phone = r.get("手机号", "").strip() if phone in b7_a1_overlap: d = r["_date"] b7_a1_by_lead_month[month_label(d)] += 1 for month in sorted(b7_a1_by_lead_month.keys()): print(f" {month}: {b7_a1_by_lead_month[month]}") # B7 vs A1 overlap by order month print(f"\n B7 ∩ A1 按成交月:") b7_a1_by_order_month = defaultdict(int) for r in b7: phone = r.get("手机号", "").strip() if phone in b7_a1_overlap: d = r["_date"] b7_a1_by_order_month[month_label(d)] += 1 for month in sorted(b7_a1_by_order_month.keys()): print(f" {month}: {b7_a1_by_order_month[month]}") # ── 4) 3–5月小红书 lead/paid 月度汇总 ── print("\n" + "=" * 60) print("4) 3–5月小红书 lead/paid 月度汇总") target_months = ["2026-03", "2026-04", "2026-05"] # Lead by month (A1) lead_by_month = defaultdict(int) for r in a1: d = r["_date"] if d: lead_by_month[month_label(d)] += 1 # Paid by month (A3) paid_by_month = defaultdict(int) paid_amount_by_month = defaultdict(float) for r in a3: d = r["_date"] if d: m = month_label(d) paid_by_month[m] += 1 try: paid_amount_by_month[m] += float(r.get("实付金额", 0) or 0) except: pass # B6 paid by month b6_paid_by_month = defaultdict(int) b6_amount_by_month = defaultdict(float) for r in b6: d = r["_date"] if d: m = month_label(d) b6_paid_by_month[m] += 1 try: b6_amount_by_month[m] += float(r.get("实付金额", 0) or 0) except: pass # B7 paid by month b7_paid_by_month = defaultdict(int) b7_amount_by_month = defaultdict(float) for r in b7: d = r["_date"] if d: m = month_label(d) b7_paid_by_month[m] += 1 try: b7_amount_by_month[m] += float(r.get("实付金额", 0) or 0) except: pass print(f"\n {'月份':<10} {'A1留资':>8} {'A3成单':>8} {'A3金额':>12} {'B6达人成单':>10} {'B6金额':>12} {'B7全量成单':>10} {'B7金额':>12}") print(f" {'-'*90}") for month in target_months: print(f" {month:<10} {lead_by_month.get(month,0):>8} {paid_by_month.get(month,0):>8} {paid_amount_by_month.get(month,0):>12,.0f} {b6_paid_by_month.get(month,0):>10} {b6_amount_by_month.get(month,0):>12,.0f} {b7_paid_by_month.get(month,0):>10} {b7_amount_by_month.get(month,0):>12,.0f}") # Totals print(f" {'-'*90}") print(f" {'合计':<10} {sum(lead_by_month.get(m,0) for m in target_months):>8} {sum(paid_by_month.get(m,0) for m in target_months):>8} {sum(paid_amount_by_month.get(m,0) for m in target_months):>12,.0f} {sum(b6_paid_by_month.get(m,0) for m in target_months):>10} {sum(b6_amount_by_month.get(m,0) for m in target_months):>12,.0f} {sum(b7_paid_by_month.get(m,0) for m in target_months):>10} {sum(b7_amount_by_month.get(m,0) for m in target_months):>12,.0f}") # Also show all months for context print(f"\n 全量月份:") all_months = sorted(set(list(lead_by_month.keys()) + list(paid_by_month.keys()) + list(b6_paid_by_month.keys()) + list(b7_paid_by_month.keys()))) print(f" {'月份':<10} {'A1留资':>8} {'A3成单':>8} {'B6达人':>8} {'B7全量':>8}") for month in all_months: print(f" {month:<10} {lead_by_month.get(month,0):>8} {paid_by_month.get(month,0):>8} {b6_paid_by_month.get(month,0):>8} {b7_paid_by_month.get(month,0):>8}") # ── Extra: B7 dedup analysis ── print("\n" + "=" * 60) print("B7 去重分析") b7_phone_counts = defaultdict(int) for r in b7: phone = r.get("手机号", "").strip() if phone: b7_phone_counts[phone] += 1 b7_dup = {p: c for p, c in b7_phone_counts.items() if c > 1} print(f" B7 唯一手机号: {len(b7_phone_counts)}") print(f" B7 重复手机号: {len(b7_dup)} (共 {sum(b7_dup.values())} 条记录)") if b7_dup: print(f" 重复样例 (前5):") for i, (p, c) in enumerate(sorted(b7_dup.items(), key=lambda x: -x[1])[:5]): print(f" {p}: {c}次") print("\n✅ 分析完成")