#!/usr/bin/env python3 """ 销售线索用户分析报表 输入:Excel文件,包含「用户ID」列 输出:每个用户+角色一行,包含注册信息、序章完成情况、购买和退款信息 修正口径: - 体验课 = 固定10节课:L1 U00 L01-L05 (chapter_id: 343,344,345,346,348) + L2 U00 L01-L05 (55-59) - 完成时间 = play_status=1 的最早 updated_at """ import os import sys import psycopg2 import pandas as pd from collections import defaultdict # ── 数据库 ── DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" DB_PORT = 28591 DB_USER = "ai_member" DB_NAME = "vala_bi" def get_password(): secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env") # also try env pw = os.environ.get("PG_ONLINE_PASSWORD", "") if pw: return pw if os.path.exists(secrets_path): with open(secrets_path) as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.strip().split("=", 1)[1].strip("'\"") raise RuntimeError("PG_ONLINE_PASSWORD not found") def get_conn(): return psycopg2.connect(host=DB_HOST, port=DB_PORT, user=DB_USER, password=get_password(), dbname=DB_NAME, connect_timeout=30) # ── 序章 chapter_id ── # L1 S0 U00 L01..L05 L1_CHAPTERS = {343: "L1_U00_L01", 344: "L1_U00_L02", 345: "L1_U00_L03", 346: "L1_U00_L04", 348: "L1_U00_L05"} # L2 S0 U00 L01..L05 L2_CHAPTERS = {55: "L2_U00_L01", 56: "L2_U00_L02", 57: "L2_U00_L03", 58: "L2_U00_L04", 59: "L2_U00_L05"} ALL_CHAPTERS = {**L1_CHAPTERS, **L2_CHAPTERS} COL_ORDER = [ "L1_U00_L01_完成时间", "L1_U00_L02_完成时间", "L1_U00_L03_完成时间", "L1_U00_L04_完成时间", "L1_U00_L05_完成时间", "L2_U00_L01_完成时间", "L2_U00_L02_完成时间", "L2_U00_L03_完成时间", "L2_U00_L04_完成时间", "L2_U00_L05_完成时间", ] def main(): input_file = sys.argv[1] if len(sys.argv) > 1 else "/root/.openclaw/media/inbound/3æ_è_çº_çº_ç---d9a41af7-b100-43a7-a983-d4fd1f164023.xlsx" print(f"读取输入文件: {input_file}") df_input = pd.read_excel(input_file, dtype=str) user_ids_raw = df_input["用户ID"].dropna().unique().tolist() account_ids = [int(x) for x in user_ids_raw] print(f"共 {len(account_ids)} 个用户ID") conn = get_conn() # ── 1. 获取用户基本信息(account) ── placeholders = ",".join(["%s"] * len(account_ids)) df_accounts = pd.read_sql_query( f"SELECT id AS account_id, created_at AS reg_time FROM bi_vala_app_account WHERE id IN ({placeholders}) AND status = 1", conn, params=account_ids ) print(f" 有效账户: {len(df_accounts)}") # ── 2. 获取角色(排除 nickname 为空的) ── df_chars = pd.read_sql_query( f"SELECT id AS character_id, account_id, nickname, created_at AS char_created_at FROM bi_vala_app_character WHERE account_id IN ({placeholders}) AND (nickname IS NOT NULL AND nickname != '') AND deleted_at IS NULL", conn, params=account_ids ) print(f" 有效角色: {len(df_chars)}") if df_chars.empty: print("没有有效的角色,退出") conn.close() return # ── 3. 查询课时完成记录(10个 chapter_id,8个分表) ── chapter_ids = list(ALL_CHAPTERS.keys()) char_id_set = set(df_chars["character_id"].tolist()) char_play = defaultdict(dict) total_play = 0 for tbl_idx in range(8): table = f"bi_user_chapter_play_record_{tbl_idx}" sql = f""" SELECT user_id, chapter_id, MIN(updated_at) AS done_time FROM {table} WHERE chapter_id IN %s AND play_status = 1 AND deleted_at IS NULL GROUP BY user_id, chapter_id """ try: cur = conn.cursor() cur.execute(sql, (tuple(chapter_ids),)) rows = cur.fetchall() cur.close() total_play += len(rows) for user_id, ch_id, done_time in rows: if user_id in char_id_set: label = ALL_CHAPTERS.get(ch_id) if label: char_play[user_id][ch_id] = done_time except Exception as e: print(f" 警告: {table} 查询失败: {e}") print(f" 课时完成记录: {total_play} 条, 匹配角色: {len(char_play)}") # ── 4. 订单信息 ── df_orders = pd.read_sql_query( f""" SELECT o.account_id, o.pay_success_date, o.goods_name, o.pay_amount_int, o.key_from, o.trade_no, o.order_status FROM bi_vala_order o INNER JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1 WHERE o.account_id IN ({placeholders}) AND o.deleted_at IS NULL ORDER BY o.account_id, o.pay_success_date """, conn, params=account_ids ) print(f" 订单记录: {len(df_orders)}") # ── 5. 退款信息 ── all_trade_nos = df_orders["trade_no"].dropna().unique().tolist() refund_map = {} if all_trade_nos: # 分批查询(防止IN子句过长) for i in range(0, len(all_trade_nos), 500): batch = all_trade_nos[i:i+500] ph = ",".join(["%s"] * len(batch)) cur = conn.cursor() cur.execute(f"SELECT trade_no, SUM(refund_amount_int) FROM bi_refund_order WHERE trade_no IN ({ph}) AND status = 3 AND deleted_at IS NULL GROUP BY trade_no", batch) for trade_no, amt in cur.fetchall(): refund_map[trade_no] = amt cur.close() conn.close() print(f" 退费记录: {len(refund_map)} 条") # ── 6. 原始文件线索信息 ── df_input["用户ID_int"] = df_input["用户ID"].astype(int) df_input_map = df_input[["用户ID_int", "线索进线日期", "销售"]].drop_duplicates(subset="用户ID_int") # ── 7. 组装结果 ── df_chars = df_chars.merge(df_accounts, on="account_id", how="left") # 去掉时区信息(Excel不支持) for col in ["reg_time", "char_created_at"]: if col in df_chars.columns: df_chars[col] = pd.to_datetime(df_chars[col]).dt.tz_localize(None) rows = [] for _, char_row in df_chars.iterrows(): account_id = int(char_row["account_id"]) char_id = int(char_row["character_id"]) reg_time = char_row["reg_time"] char_created_at = char_row["char_created_at"] # 课时完成 play_map = char_play.get(char_id, {}) row_data = { "用户ID": account_id, "角色ID": char_id, "用户注册时间": reg_time, "角色创建时间": char_created_at, } for col_label in COL_ORDER: # 找到对应的 chapter_id(ALL_CHAPTERS 的 value 是去掉 "_完成时间" 后缀的) ch_id = None lbl_key = col_label.replace("_完成时间", "") for cid, lbl in ALL_CHAPTERS.items(): if lbl == lbl_key: ch_id = cid break done_time = play_map.get(ch_id, None) if done_time is not None: done_time = done_time.replace(tzinfo=None) row_data[col_label] = done_time rows.append(row_data) df_result = pd.DataFrame(rows) # ── 7. 合并订单 & 退款 ── # 按 account_id 聚合 order_agg = df_orders.groupby("account_id").agg( 购买时间=("pay_success_date", lambda x: ";".join(str(v) for v in x if pd.notna(v))), 购买课包名称=("goods_name", lambda x: ";".join(str(v) for v in x if pd.notna(v))), 支付金额=("pay_amount_int", lambda x: ";".join(str(v/100) for v in x if pd.notna(v))), 购买渠道key_from=("key_from", lambda x: ";".join(str(v) for v in x if pd.notna(v))), trade_nos=("trade_no", lambda x: list(x)), ).reset_index() # 退款判断 def calc_refund(row): has_refund = False total_refund = 0 for tn in row["trade_nos"]: if tn in refund_map: has_refund = True total_refund += refund_map[tn] # 也检查 order_status=4 for tn in row["trade_nos"]: if tn not in refund_map: # 查订单状态 pass # order_status 已在订单表中 return pd.Series({"是否退款": "是" if has_refund else "否", "退款金额": total_refund / 100.0}) refund_info = order_agg.apply(calc_refund, axis=1) order_agg = pd.concat([order_agg[["account_id", "购买时间", "购买课包名称", "支付金额", "购买渠道key_from"]], refund_info], axis=1) # 合并到结果 df_result["account_id_int"] = df_result["用户ID"].astype(int) df_result = df_result.merge(order_agg, left_on="account_id_int", right_on="account_id", how="left") df_result.drop(columns=["account_id"], inplace=True, errors="ignore") # 合并原始文件的线索进线日期和销售 df_result = df_result.merge(df_input_map, left_on="account_id_int", right_on="用户ID_int", how="left") df_result.drop(columns=["account_id_int", "用户ID_int"], inplace=True, errors="ignore") # 填充空值 df_result["购买时间"] = df_result["购买时间"].fillna("") df_result["购买课包名称"] = df_result["购买课包名称"].fillna("") df_result["支付金额"] = df_result["支付金额"].fillna("") df_result["购买渠道key_from"] = df_result["购买渠道key_from"].fillna("") df_result["是否退款"] = df_result["是否退款"].fillna("否") df_result["退款金额"] = df_result["退款金额"].fillna(0.0) df_result["线索进线日期"] = df_result["线索进线日期"].fillna("") df_result["销售"] = df_result["销售"].fillna("") # 按用户ID升序排列 df_result = df_result.sort_values(by=["用户ID", "角色ID"], ascending=True).reset_index(drop=True) # 调整列顺序:原始字段靠前 col_order = ["用户ID", "线索进线日期", "销售", "角色ID", "用户注册时间", "角色创建时间"] + COL_ORDER + ["购买时间", "购买课包名称", "支付金额", "购买渠道key_from", "是否退款", "退款金额"] # 只保留实际存在的列 col_order = [c for c in col_order if c in df_result.columns] df_result = df_result[col_order] # ── 8. 输出 ── output_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output") os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, "销售线索_用户分析.xlsx") with pd.ExcelWriter(output_path, engine="openpyxl") as writer: df_result.to_excel(writer, sheet_name="用户分析", index=False) print(f"\n✅ 报表已生成: {output_path}") print(f" 总行数: {len(df_result)}") purchased = sum(1 for v in df_result["购买时间"] if v) print(f" 有购买记录: {purchased}") refunded = sum(1 for v in df_result["是否退款"] if v == "是") print(f" 有退款: {refunded}") done_l1 = sum(1 for v in df_result["L1_U00_L01_完成时间"] if pd.notna(v)) done_l2 = sum(1 for v in df_result["L2_U00_L01_完成时间"] if pd.notna(v)) print(f" 完成L1序章(U00 L01): {done_l1} 个角色") print(f" 完成L2序章(U00 L01): {done_l2} 个角色") if __name__ == "__main__": main()