ai_member_xiaoxi/scripts/lead_user_analysis.py
2026-05-10 08:00:01 +08:00

277 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
销售线索用户分析报表
输入Excel文件包含「用户ID」列
输出:每个用户+角色一行,包含注册信息、序章完成情况、购买和退款信息
修正口径:
- 体验课 = 固定10节课L1 U00 L01-L05 (chapter_id: 343,344,345,346,348) + L2 U00 L01-L05 (55-59)
- 完成时间 = play_status=1 的最早 updated_at
"""
import os
import sys
import psycopg2
import pandas as pd
from collections import defaultdict
# ── 数据库 ──
DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
DB_PORT = 28591
DB_USER = "ai_member"
DB_NAME = "vala_bi"
def get_password():
secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env")
# also try env
pw = os.environ.get("PG_ONLINE_PASSWORD", "")
if pw:
return pw
if os.path.exists(secrets_path):
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip("'\"")
raise RuntimeError("PG_ONLINE_PASSWORD not found")
def get_conn():
return psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=get_password(), dbname=DB_NAME, connect_timeout=30)
# ── 序章 chapter_id ──
# L1 S0 U00 L01..L05
L1_CHAPTERS = {343: "L1_U00_L01", 344: "L1_U00_L02", 345: "L1_U00_L03", 346: "L1_U00_L04", 348: "L1_U00_L05"}
# L2 S0 U00 L01..L05
L2_CHAPTERS = {55: "L2_U00_L01", 56: "L2_U00_L02", 57: "L2_U00_L03", 58: "L2_U00_L04", 59: "L2_U00_L05"}
ALL_CHAPTERS = {**L1_CHAPTERS, **L2_CHAPTERS}
COL_ORDER = [
"L1_U00_L01_完成时间", "L1_U00_L02_完成时间", "L1_U00_L03_完成时间",
"L1_U00_L04_完成时间", "L1_U00_L05_完成时间",
"L2_U00_L01_完成时间", "L2_U00_L02_完成时间", "L2_U00_L03_完成时间",
"L2_U00_L04_完成时间", "L2_U00_L05_完成时间",
]
def main():
input_file = sys.argv[1] if len(sys.argv) > 1 else "/root/.openclaw/media/inbound/3æ_è_çº_çº_ç---d9a41af7-b100-43a7-a983-d4fd1f164023.xlsx"
print(f"读取输入文件: {input_file}")
df_input = pd.read_excel(input_file, dtype=str)
user_ids_raw = df_input["用户ID"].dropna().unique().tolist()
# 过滤掉非数字的用户ID如"不上"等标记)
valid_ids = []
for x in user_ids_raw:
try:
valid_ids.append(int(x))
except ValueError:
print(f" 跳过非数字用户ID: {x}")
account_ids = valid_ids
print(f"{len(account_ids)} 个用户ID")
conn = get_conn()
# ── 1. 获取用户基本信息account ──
placeholders = ",".join(["%s"] * len(account_ids))
df_accounts = pd.read_sql_query(
f"SELECT id AS account_id, created_at AS reg_time FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status = 1",
conn, params=account_ids
)
print(f" 有效账户: {len(df_accounts)}")
# ── 2. 获取角色(排除 nickname 为空的) ──
df_chars = pd.read_sql_query(
f"SELECT id AS character_id, account_id, nickname, created_at AS char_created_at FROM bi_vala_app_character WHERE account_id IN ({placeholders}) AND (nickname IS NOT NULL AND nickname != '') AND deleted_at IS NULL",
conn, params=account_ids
)
print(f" 有效角色: {len(df_chars)}")
if df_chars.empty:
print("没有有效的角色,退出")
conn.close()
return
# ── 3. 查询课时完成记录10个 chapter_id8个分表 ──
chapter_ids = list(ALL_CHAPTERS.keys())
char_id_set = set(df_chars["character_id"].tolist())
char_play = defaultdict(dict)
total_play = 0
for tbl_idx in range(8):
table = f"bi_user_chapter_play_record_{tbl_idx}"
sql = f"""
SELECT user_id, chapter_id, MIN(updated_at) AS done_time
FROM {table}
WHERE chapter_id IN %s
AND play_status = 1
AND deleted_at IS NULL
GROUP BY user_id, chapter_id
"""
try:
cur = conn.cursor()
cur.execute(sql, (tuple(chapter_ids),))
rows = cur.fetchall()
cur.close()
total_play += len(rows)
for user_id, ch_id, done_time in rows:
if user_id in char_id_set:
label = ALL_CHAPTERS.get(ch_id)
if label:
char_play[user_id][ch_id] = done_time
except Exception as e:
print(f" 警告: {table} 查询失败: {e}")
print(f" 课时完成记录: {total_play} 条, 匹配角色: {len(char_play)}")
# ── 4. 订单信息 ──
df_orders = pd.read_sql_query(
f"""
SELECT o.account_id, o.pay_success_date, o.goods_name, o.pay_amount_int, o.key_from, o.trade_no, o.order_status
FROM bi_vala_order o
INNER JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.account_id IN ({placeholders})
AND o.deleted_at IS NULL
ORDER BY o.account_id, o.pay_success_date
""",
conn, params=account_ids
)
print(f" 订单记录: {len(df_orders)}")
# ── 5. 退款信息 ──
all_trade_nos = df_orders["trade_no"].dropna().unique().tolist()
refund_map = {}
if all_trade_nos:
# 分批查询防止IN子句过长
for i in range(0, len(all_trade_nos), 500):
batch = all_trade_nos[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur = conn.cursor()
cur.execute(f"SELECT trade_no, SUM(refund_amount_int) FROM bi_refund_order WHERE trade_no IN ({ph}) AND status = 3 AND deleted_at IS NULL GROUP BY trade_no", batch)
for trade_no, amt in cur.fetchall():
refund_map[trade_no] = amt
cur.close()
conn.close()
print(f" 退费记录: {len(refund_map)}")
# ── 6. 原始文件线索信息 ──
df_input = df_input[df_input["用户ID"].apply(lambda x: str(x).isdigit() if pd.notna(x) else False)]
df_input["用户ID_int"] = df_input["用户ID"].astype(int)
df_input_map = df_input[["用户ID_int", "线索进线日期", "销售"]].drop_duplicates(subset="用户ID_int")
# ── 7. 组装结果 ──
df_chars = df_chars.merge(df_accounts, on="account_id", how="left")
# 去掉时区信息Excel不支持
for col in ["reg_time", "char_created_at"]:
if col in df_chars.columns:
df_chars[col] = pd.to_datetime(df_chars[col]).dt.tz_localize(None)
rows = []
for _, char_row in df_chars.iterrows():
account_id = int(char_row["account_id"])
char_id = int(char_row["character_id"])
reg_time = char_row["reg_time"]
char_created_at = char_row["char_created_at"]
# 课时完成
play_map = char_play.get(char_id, {})
row_data = {
"用户ID": account_id,
"角色ID": char_id,
"用户注册时间": reg_time,
"角色创建时间": char_created_at,
}
for col_label in COL_ORDER:
# 找到对应的 chapter_idALL_CHAPTERS 的 value 是去掉 "_完成时间" 后缀的)
ch_id = None
lbl_key = col_label.replace("_完成时间", "")
for cid, lbl in ALL_CHAPTERS.items():
if lbl == lbl_key:
ch_id = cid
break
done_time = play_map.get(ch_id, None)
if done_time is not None:
done_time = done_time.replace(tzinfo=None)
row_data[col_label] = done_time
rows.append(row_data)
df_result = pd.DataFrame(rows)
# ── 7. 合并订单 & 退款 ──
# 按 account_id 聚合
order_agg = df_orders.groupby("account_id").agg(
购买时间=("pay_success_date", lambda x: ";".join(str(v) for v in x if pd.notna(v))),
购买课包名称=("goods_name", lambda x: ";".join(str(v) for v in x if pd.notna(v))),
支付金额=("pay_amount_int", lambda x: ";".join(str(v/100) for v in x if pd.notna(v))),
购买渠道key_from=("key_from", lambda x: ";".join(str(v) for v in x if pd.notna(v))),
trade_nos=("trade_no", lambda x: list(x)),
).reset_index()
# 退款判断
def calc_refund(row):
has_refund = False
total_refund = 0
for tn in row["trade_nos"]:
if tn in refund_map:
has_refund = True
total_refund += refund_map[tn]
# 也检查 order_status=4
for tn in row["trade_nos"]:
if tn not in refund_map:
# 查订单状态
pass # order_status 已在订单表中
return pd.Series({"是否退款": "" if has_refund else "", "退款金额": total_refund / 100.0})
refund_info = order_agg.apply(calc_refund, axis=1)
order_agg = pd.concat([order_agg[["account_id", "购买时间", "购买课包名称", "支付金额", "购买渠道key_from"]], refund_info], axis=1)
# 合并到结果
df_result["account_id_int"] = df_result["用户ID"].astype(int)
df_result = df_result.merge(order_agg, left_on="account_id_int", right_on="account_id", how="left")
df_result.drop(columns=["account_id"], inplace=True, errors="ignore")
# 合并原始文件的线索进线日期和销售
df_result = df_result.merge(df_input_map, left_on="account_id_int", right_on="用户ID_int", how="left")
df_result.drop(columns=["account_id_int", "用户ID_int"], inplace=True, errors="ignore")
# 填充空值
df_result["购买时间"] = df_result["购买时间"].fillna("")
df_result["购买课包名称"] = df_result["购买课包名称"].fillna("")
df_result["支付金额"] = df_result["支付金额"].fillna("")
df_result["购买渠道key_from"] = df_result["购买渠道key_from"].fillna("")
df_result["是否退款"] = df_result["是否退款"].fillna("")
df_result["退款金额"] = df_result["退款金额"].fillna(0.0)
df_result["线索进线日期"] = df_result["线索进线日期"].fillna("")
df_result["销售"] = df_result["销售"].fillna("")
# 按用户ID升序排列
df_result = df_result.sort_values(by=["用户ID", "角色ID"], ascending=True).reset_index(drop=True)
# 调整列顺序:原始字段靠前
col_order = ["用户ID", "线索进线日期", "销售", "角色ID", "用户注册时间", "角色创建时间"] + COL_ORDER + ["购买时间", "购买课包名称", "支付金额", "购买渠道key_from", "是否退款", "退款金额"]
# 只保留实际存在的列
col_order = [c for c in col_order if c in df_result.columns]
df_result = df_result[col_order]
# ── 8. 输出 ──
output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output")
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "销售线索_用户分析.xlsx")
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
df_result.to_excel(writer, sheet_name="用户分析", index=False)
print(f"\n✅ 报表已生成: {output_path}")
print(f" 总行数: {len(df_result)}")
purchased = sum(1 for v in df_result["购买时间"] if v)
print(f" 有购买记录: {purchased}")
refunded = sum(1 for v in df_result["是否退款"] if v == "")
print(f" 有退款: {refunded}")
done_l1 = sum(1 for v in df_result["L1_U00_L01_完成时间"] if pd.notna(v))
done_l2 = sum(1 for v in df_result["L2_U00_L01_完成时间"] if pd.notna(v))
print(f" 完成L1序章(U00 L01): {done_l1} 个角色")
print(f" 完成L2序章(U00 L01): {done_l2} 个角色")
if __name__ == "__main__":
main()