ai_member_xiaoxi/scripts/lead_user_analysis.py
2026-06-02 08:00:01 +08:00

442 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
销售线索用户分析报表
输入Excel文件包含「用户ID」列 或 手机号列
输出:每个用户+角色一行,包含注册信息、序章完成情况、购买和退款信息
修正口径:
- 体验课 = 固定10节课L1 U00 L01-L05 (chapter_id: 343,344,345,346,348) + L2 U00 L01-L05 (55-59)
- 完成时间 = play_status=1 的最早 updated_at
- 若无「用户ID」列则自动识别手机号列脱敏匹配账号ID后再查询
"""
import os
import re
import sys
import psycopg2
import pandas as pd
from collections import defaultdict
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
# ── 数据库 ──
DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
DB_PORT = 28591
DB_USER = "ai_member"
DB_NAME = "vala_bi"
def get_password():
secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env")
# also try env
pw = os.environ.get("PG_ONLINE_PASSWORD", "")
if pw:
return pw
if os.path.exists(secrets_path):
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip("'\"")
raise RuntimeError("PG_ONLINE_PASSWORD not found")
def get_conn():
return psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=get_password(), dbname=DB_NAME, connect_timeout=30)
# ── 序章 chapter_id ──
# L1 S0 U00 L01..L05
L1_CHAPTERS = {343: "L1_U00_L01", 344: "L1_U00_L02", 345: "L1_U00_L03", 346: "L1_U00_L04", 348: "L1_U00_L05"}
# L2 S0 U00 L01..L05
L2_CHAPTERS = {55: "L2_U00_L01", 56: "L2_U00_L02", 57: "L2_U00_L03", 58: "L2_U00_L04", 59: "L2_U00_L05"}
ALL_CHAPTERS = {**L1_CHAPTERS, **L2_CHAPTERS}
COL_ORDER = [
"L1_U00_L01_完成时间", "L1_U00_L02_完成时间", "L1_U00_L03_完成时间",
"L1_U00_L04_完成时间", "L1_U00_L05_完成时间",
"L2_U00_L01_完成时间", "L2_U00_L02_完成时间", "L2_U00_L03_完成时间",
"L2_U00_L04_完成时间", "L2_U00_L05_完成时间",
]
def encrypt_phone_local(phone):
"""手机号 XXTEA 加密"""
return encrypt_phone(phone)
def mask_phone_display(phone):
"""手机号脱敏用于展示: 130****1234"""
return f"{phone[:3]}****{phone[-4:]}"
def extract_phones_from_df(df):
"""从 DataFrame 中提取所有 1 开头的 11 位手机号(去重保持顺序)"""
phones = []
for col in df.columns:
for val in df[col].dropna():
val_str = str(int(val)) if isinstance(val, (int, float)) else str(val)
val_str = val_str.strip()
if re.match(r'^1\d{10}$', val_str):
phones.append(val_str)
seen = set()
unique = []
for p in phones:
if p not in seen:
seen.add(p)
unique.append(p)
return unique
def match_phones_to_accounts(phones, conn):
"""
手机号 XXTEA 加密后匹配 bi_vala_app_account.tel_encrypt
返回: (phone_to_account, account_ids, unmatched_phones)
"""
if not phones:
return {}, [], []
# 加密
encrypt_to_phones = {}
for p in phones:
enc = encrypt_phone(p)
encrypt_to_phones.setdefault(enc, []).append(p)
enc_list = list(encrypt_to_phones.keys())
cur = conn.cursor()
placeholders = ",".join(["%s"] * len(enc_list))
cur.execute(f"""
SELECT id AS account_id, tel_encrypt
FROM bi_vala_app_account
WHERE tel_encrypt IN ({placeholders})
AND status = 1
AND deleted_at IS NULL
""", enc_list)
rows = cur.fetchall()
cur.close()
# tel_encrypt -> account_id
enc_to_account = {}
for aid, enc in rows:
if enc not in enc_to_account:
enc_to_account[enc] = aid
# 明文手机号 -> account_id
phone_to_account = {}
for p in phones:
enc = encrypt_phone(p)
if enc in enc_to_account:
phone_to_account[p] = enc_to_account[enc]
account_ids = list(set(phone_to_account.values()))
unmatched = [p for p in phones if p not in phone_to_account]
return phone_to_account, account_ids, unmatched
def main():
input_file = sys.argv[1] if len(sys.argv) > 1 else "/root/.openclaw/media/inbound/3æ_è_çº_çº_ç---d9a41af7-b100-43a7-a983-d4fd1f164023.xlsx"
print(f"读取输入文件: {input_file}")
df_input = pd.read_excel(input_file, dtype=str)
# ── 0. 识别输入文件类型用户ID 还是 手机号 ──
has_user_id = "用户ID" in df_input.columns
account_ids = []
phone_to_account = {} # 明文手机号 -> account_id仅手机号模式使用
unmatched_phones = []
use_phone_mode = False
df_input_map = None # 每行的线索信息映射
if has_user_id:
print("检测到「用户ID」列使用用户ID直接匹配")
user_ids_raw = df_input["用户ID"].dropna().unique().tolist()
valid_ids = []
for x in user_ids_raw:
try:
valid_ids.append(int(x))
except ValueError:
print(f" 跳过非数字用户ID: {x}")
account_ids = valid_ids
print(f"{len(account_ids)} 个用户ID")
# 原始文件线索信息
df_input = df_input[df_input["用户ID"].apply(lambda x: str(x).isdigit() if pd.notna(x) else False)]
df_input["用户ID_int"] = df_input["用户ID"].astype(int)
df_input_map = df_input[["用户ID_int", "线索进线日期", "销售"]].drop_duplicates(subset="用户ID_int")
else:
print("未检测到「用户ID」列尝试识别手机号...")
phones = extract_phones_from_df(df_input)
if not phones:
print("ERROR: 既没有「用户ID」列也没有找到手机号无法继续")
sys.exit(1)
print(f"提取到 {len(phones)} 个手机号(去重后)")
conn = get_conn()
phone_to_account, account_ids, unmatched_phones = match_phones_to_accounts(phones, conn)
conn.close()
print(f"匹配到 {len(account_ids)} 个账号,未匹配 {len(unmatched_phones)} 个手机号")
if unmatched_phones:
print(f" 未匹配手机号: {unmatched_phones}")
if not account_ids:
print("ERROR: 没有任何手机号匹配到账号,无法继续")
sys.exit(1)
use_phone_mode = True
# 构建手机号 → 线索信息的映射
# 找手机号列(第一个含手机号的列)
phone_col = None
for col in df_input.columns:
sample = df_input[col].dropna().head(5).tolist()
if any(re.match(r'^1\d{10}$', str(int(v)) if isinstance(v, (int, float)) else str(v))
for v in sample):
phone_col = col
break
if phone_col:
# 规范化手机号
def normalize_phone(val):
try:
return str(int(float(val))) if pd.notna(val) else ""
except (ValueError, TypeError):
return str(val).strip()
df_input["_phone"] = df_input[phone_col].apply(normalize_phone)
# 提取线索信息列
clue_cols = [c for c in ["线索进线日期", "销售"] if c in df_input.columns]
if clue_cols:
df_input_map = df_input[["_phone"] + clue_cols].drop_duplicates(subset="_phone")
df_input_map.rename(columns={"_phone": "手机号"}, inplace=True)
else:
# 没有线索列,只保留手机号用于后续关联
df_input_map = df_input[["_phone"]].drop_duplicates(subset="_phone")
df_input_map.rename(columns={"_phone": "手机号"}, inplace=True)
# ── 公共部分:用 account_ids 查询后续数据 ──
conn = get_conn()
# ── 1. 获取用户基本信息account ──
placeholders = ",".join(["%s"] * len(account_ids))
df_accounts = pd.read_sql_query(
f"SELECT id AS account_id, created_at AS reg_time FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status = 1",
conn, params=account_ids
)
print(f" 有效账户: {len(df_accounts)}")
# ── 2. 获取角色(排除 nickname 为空的) ──
df_chars = pd.read_sql_query(
f"SELECT id AS character_id, account_id, nickname, created_at AS char_created_at FROM bi_vala_app_character WHERE account_id IN ({placeholders}) AND (nickname IS NOT NULL AND nickname != '') AND deleted_at IS NULL",
conn, params=account_ids
)
print(f" 有效角色: {len(df_chars)}")
if df_chars.empty:
print("没有有效的角色,退出")
conn.close()
return
# ── 3. 查询课时完成记录10个 chapter_id8个分表 ──
chapter_ids = list(ALL_CHAPTERS.keys())
char_id_set = set(df_chars["character_id"].tolist())
char_play = defaultdict(dict)
total_play = 0
for tbl_idx in range(8):
table = f"bi_user_chapter_play_record_{tbl_idx}"
sql = f"""
SELECT user_id, chapter_id, MIN(updated_at) AS done_time
FROM {table}
WHERE chapter_id IN %s
AND play_status = 1
AND deleted_at IS NULL
GROUP BY user_id, chapter_id
"""
try:
cur = conn.cursor()
cur.execute(sql, (tuple(chapter_ids),))
rows = cur.fetchall()
cur.close()
total_play += len(rows)
for user_id, ch_id, done_time in rows:
if user_id in char_id_set:
label = ALL_CHAPTERS.get(ch_id)
if label:
char_play[user_id][ch_id] = done_time
except Exception as e:
print(f" 警告: {table} 查询失败: {e}")
print(f" 课时完成记录: {total_play} 条, 匹配角色: {len(char_play)}")
# ── 4. 订单信息 ──
df_orders = pd.read_sql_query(
f"""
SELECT o.account_id, o.pay_success_date, o.goods_name, o.pay_amount_int, o.key_from, o.trade_no, o.order_status
FROM bi_vala_order o
INNER JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.account_id IN ({placeholders})
AND o.deleted_at IS NULL
ORDER BY o.account_id, o.pay_success_date
""",
conn, params=account_ids
)
print(f" 订单记录: {len(df_orders)}")
# ── 5. 退款信息 ──
all_trade_nos = df_orders["trade_no"].dropna().unique().tolist()
refund_map = {}
if all_trade_nos:
# 分批查询防止IN子句过长
for i in range(0, len(all_trade_nos), 500):
batch = all_trade_nos[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur = conn.cursor()
cur.execute(f"SELECT trade_no, SUM(refund_amount_int) FROM bi_refund_order WHERE trade_no IN ({ph}) AND status = 3 AND deleted_at IS NULL GROUP BY trade_no", batch)
for trade_no, amt in cur.fetchall():
refund_map[trade_no] = amt
cur.close()
conn.close()
print(f" 退费记录: {len(refund_map)}")
# ── 6. 组装结果 ──
df_chars = df_chars.merge(df_accounts, on="account_id", how="left")
# 去掉时区信息Excel不支持
for col in ["reg_time", "char_created_at"]:
if col in df_chars.columns:
df_chars[col] = pd.to_datetime(df_chars[col]).dt.tz_localize(None)
rows = []
for _, char_row in df_chars.iterrows():
account_id = int(char_row["account_id"])
char_id = int(char_row["character_id"])
reg_time = char_row["reg_time"]
char_created_at = char_row["char_created_at"]
# 课时完成
play_map = char_play.get(char_id, {})
row_data = {
"用户ID": account_id,
"角色ID": char_id,
"用户注册时间": reg_time,
"角色创建时间": char_created_at,
}
for col_label in COL_ORDER:
# 找到对应的 chapter_idALL_CHAPTERS 的 value 是去掉 "_完成时间" 后缀的)
ch_id = None
lbl_key = col_label.replace("_完成时间", "")
for cid, lbl in ALL_CHAPTERS.items():
if lbl == lbl_key:
ch_id = cid
break
done_time = play_map.get(ch_id, None)
if done_time is not None:
done_time = done_time.replace(tzinfo=None)
row_data[col_label] = done_time
rows.append(row_data)
df_result = pd.DataFrame(rows)
# ── 7. 合并订单 & 退款 ──
# 按 account_id 聚合
order_agg = df_orders.groupby("account_id").agg(
购买时间=("pay_success_date", lambda x: ";".join(str(v) for v in x if pd.notna(v))),
购买课包名称=("goods_name", lambda x: ";".join(str(v) for v in x if pd.notna(v))),
支付金额=("pay_amount_int", lambda x: ";".join(str(v/100) for v in x if pd.notna(v))),
购买渠道key_from=("key_from", lambda x: ";".join(str(v) for v in x if pd.notna(v))),
trade_nos=("trade_no", lambda x: list(x)),
).reset_index()
# 退款判断
def calc_refund(row):
has_refund = False
total_refund = 0
for tn in row["trade_nos"]:
if tn in refund_map:
has_refund = True
total_refund += refund_map[tn]
return pd.Series({"是否退款": "" if has_refund else "", "退款金额": total_refund / 100.0})
refund_info = order_agg.apply(calc_refund, axis=1)
order_agg = pd.concat([order_agg[["account_id", "购买时间", "购买课包名称", "支付金额", "购买渠道key_from"]], refund_info], axis=1)
# 合并到结果
df_result["account_id_int"] = df_result["用户ID"].astype(int)
df_result = df_result.merge(order_agg, left_on="account_id_int", right_on="account_id", how="left")
df_result.drop(columns=["account_id"], inplace=True, errors="ignore")
# ── 合并 线索进线日期、销售 和(手机号模式的)手机号 ──
if df_input_map is not None:
if use_phone_mode:
# 手机号模式:通过 phone_to_account 反查手机号 → 合并线索信息
# 先给 df_result 加上手机号列
# account_id -> 明文手机号(取第一个匹配到的)
account_to_phone = {}
for phone, aid in phone_to_account.items():
if aid not in account_to_phone:
account_to_phone[aid] = phone
df_result["手机号"] = df_result["用户ID"].map(account_to_phone)
# 用手机号关联线索信息
if "线索进线日期" in df_input_map.columns or "销售" in df_input_map.columns:
df_result = df_result.merge(df_input_map, on="手机号", how="left")
else:
# 用户ID模式原逻辑
df_result = df_result.merge(df_input_map, left_on="account_id_int", right_on="用户ID_int", how="left")
df_result.drop(columns=["用户ID_int"], inplace=True, errors="ignore")
df_result.drop(columns=["account_id_int"], inplace=True, errors="ignore")
# 填充空值
df_result["购买时间"] = df_result["购买时间"].fillna("")
df_result["购买课包名称"] = df_result["购买课包名称"].fillna("")
df_result["支付金额"] = df_result["支付金额"].fillna("")
df_result["购买渠道key_from"] = df_result["购买渠道key_from"].fillna("")
df_result["是否退款"] = df_result["是否退款"].fillna("")
df_result["退款金额"] = df_result["退款金额"].fillna(0.0)
df_result["线索进线日期"] = df_result["线索进线日期"].fillna("") if "线索进线日期" in df_result.columns else ""
df_result["销售"] = df_result["销售"].fillna("") if "销售" in df_result.columns else ""
if use_phone_mode and "手机号" in df_result.columns:
df_result["手机号"] = df_result["手机号"].fillna("")
# 按用户ID升序排列
df_result = df_result.sort_values(by=["用户ID", "角色ID"], ascending=True).reset_index(drop=True)
# 调整列顺序
if use_phone_mode:
col_order = ["用户ID", "手机号", "线索进线日期", "销售", "角色ID", "用户注册时间", "角色创建时间"] + COL_ORDER + ["购买时间", "购买课包名称", "支付金额", "购买渠道key_from", "是否退款", "退款金额"]
else:
col_order = ["用户ID", "线索进线日期", "销售", "角色ID", "用户注册时间", "角色创建时间"] + COL_ORDER + ["购买时间", "购买课包名称", "支付金额", "购买渠道key_from", "是否退款", "退款金额"]
# 只保留实际存在的列
col_order = [c for c in col_order if c in df_result.columns]
df_result = df_result[col_order]
# ── 8. 输出 ──
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "销售线索_用户分析.xlsx")
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
df_result.to_excel(writer, sheet_name="用户分析", index=False)
print(f"\n✅ 报表已生成: {output_path}")
print(f" 总行数: {len(df_result)}")
purchased = sum(1 for v in df_result["购买时间"] if v)
print(f" 有购买记录: {purchased}")
refunded = sum(1 for v in df_result["是否退款"] if v == "")
print(f" 有退款: {refunded}")
done_l1 = sum(1 for v in df_result["L1_U00_L01_完成时间"] if pd.notna(v))
done_l2 = sum(1 for v in df_result["L2_U00_L01_完成时间"] if pd.notna(v))
print(f" 完成L1序章(U00 L01): {done_l1} 个角色")
print(f" 完成L2序章(U00 L01): {done_l2} 个角色")
if unmatched_phones:
print(f"\n⚠️ 未匹配到账号的手机号 ({len(unmatched_phones)} 个):")
for p in unmatched_phones:
print(f" {p}")
if __name__ == "__main__":
main()