#!/usr/bin/env python3 """ 课消指标:按周统计 2025-09-01 ~ 2026-05-10 指标: 1. 人均课消数 = 课消总次数 / 付费用户数 2. 有课消用户的人均课消数 = 课消总次数 / 有课消的付费用户数 """ import psycopg2 from collections import defaultdict from datetime import datetime, timedelta, date # 连接线上 PostgreSQL conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password="LdfjdjL83h3h3^$&**YGG*", dbname="vala_bi" ) cur = conn.cursor() # ===== 时间参数 ===== overall_start = date(2025, 9, 1) overall_end = date(2026, 5, 11) # exclusive,即5/10是最后一天 print(f"统计区间: {overall_start} ~ {overall_end - timedelta(days=1)}") # ===== 生成周列表(周一~周日) ===== weeks = [] d = overall_start while d < overall_end: week_start = d # 找周日 days_to_sunday = 6 - d.weekday() week_end = d + timedelta(days=days_to_sunday) if week_end >= overall_end: week_end = overall_end - timedelta(days=1) weeks.append((week_start, week_end)) d = week_end + timedelta(days=1) print(f"共 {len(weeks)} 周") # ===== Step 1:获取所有订单,确定每用户的付费有效期 ===== print("\nStep 1: 查询订单...") cur.execute(""" SELECT o.account_id, o.trade_no, o.out_trade_no, o.pay_success_date, o.order_status, o.pay_amount_int FROM bi_vala_order o INNER JOIN bi_vala_app_account a ON o.account_id = a.id WHERE a.status = 1 AND a.deleted_at IS NULL AND o.pay_success_date IS NOT NULL AND o.pay_success_date >= '2025-01-01' """) orders = cur.fetchall() print(f" 订单数: {len(orders)}") # 获取退费信息 cur.execute(""" SELECT trade_no, out_trade_no, status FROM bi_refund_order WHERE status = 3 """) refunds = set() refunds_by_out = set() for trade_no, out_trade_no, st in cur.fetchall(): if trade_no: refunds.add(trade_no) if out_trade_no: refunds_by_out.add(out_trade_no) print(f" 退费trade_no数: {len(refunds)}, out_trade_no数: {len(refunds_by_out)}") # 组织订单数据:{account_id: [(pay_date, is_refunded), ...]} account_orders = defaultdict(list) for aid, trade_no, out_trade_no, pay_date, order_status, amount in orders: is_refunded = ( order_status == 4 and ( (trade_no and trade_no in refunds) or (out_trade_no and out_trade_no in refunds_by_out) ) ) account_orders[aid].append((pay_date, is_refunded)) # 判断每个用户是否是付费用户(到某日期为止) def is_paid_user(aid, as_of_date): """截至 as_of_date,用户是否有未退费订单""" has_paid = False for pay_date, is_refunded in account_orders.get(aid, []): if pay_date.date() <= as_of_date: if not is_refunded: has_paid = True # 如果退了但还有其他未退订单,也算 # 重新计数:截至该日期,是否有任何未退费订单 unpaid_orders = sum( 1 for pd, ref in account_orders.get(aid, []) if pd.date() <= as_of_date and not ref ) return unpaid_orders > 0 # 序章 chapter_id(L1 U00 + L2 U00),需剔除 u0_chapters = {343, 344, 345, 346, 348, 55, 56, 57, 58, 59} # ===== Step 2:获取所有课消记录 ===== print("\nStep 2: 查询课消记录(剔除序章U0)...") # (user_id, chapter_id) -> earliest_updated_at consumption_map = {} # key=(user_id, chapter_id) -> earliest updated_at (date) for table_idx in range(8): tbl = f"bi_user_chapter_play_record_{table_idx}" cur.execute(f""" SELECT user_id, chapter_id, updated_at FROM {tbl} WHERE play_status = 1 AND updated_at >= '2025-09-01' AND updated_at < '2026-05-11' """) cnt = 0 for user_id, chapter_id, updated_at in cur.fetchall(): if chapter_id in u0_chapters: continue key = (user_id, chapter_id) d = updated_at.date() if hasattr(updated_at, 'date') else updated_at if isinstance(updated_at, datetime): d = updated_at.date() elif isinstance(updated_at, str): d = datetime.strptime(updated_at[:10], '%Y-%m-%d').date() else: d = updated_at if key not in consumption_map or d < consumption_map[key]: consumption_map[key] = d cnt += 1 print(f" {tbl}: {cnt} 条记录") print(f" 去重后课消: {len(consumption_map)} 条") # ===== Step 3:关联 character -> account ===== print("\nStep 3: 关联角色...") all_user_ids = set(k[0] for k in consumption_map) all_user_id_list = list(all_user_ids) char_to_account = {} batch_size = 500 for i in range(0, len(all_user_id_list), batch_size): batch = all_user_id_list[i:i+batch_size] ph = ','.join(['%s'] * len(batch)) cur.execute(f""" SELECT id, account_id FROM bi_vala_app_character WHERE id IN ({ph}) """, batch) for cid, aid in cur.fetchall(): char_to_account[cid] = aid print(f" 角色-账号映射: {len(char_to_account)}") # ===== Step 4:按周汇总 ===== print("\nStep 4: 按周汇总...") results = [] for ws, we in weeks: # 分母:截至该周末的付费用户 paid_users = set() for aid in account_orders: if is_paid_user(aid, we): paid_users.add(aid) # 分子:该周内付费用户的课消次数(按最早updated_at所在周) weekly_consumption = 0 consuming_accounts = set() for (uid, ch_id), cons_date in consumption_map.items(): if ws <= cons_date <= we: aid = char_to_account.get(uid) if aid and aid in paid_users: weekly_consumption += 1 consuming_accounts.add(aid) n_paid = len(paid_users) n_consuming = len(consuming_accounts) avg_all = weekly_consumption / n_paid if n_paid > 0 else 0 avg_consuming = weekly_consumption / n_consuming if n_consuming > 0 else 0 results.append({ 'week': f"{ws.strftime('%m/%d')}-{we.strftime('%m/%d')}", 'ws': ws, 'we': we, 'paid_users': n_paid, 'consumption': weekly_consumption, 'consuming_users': n_consuming, 'avg_all': avg_all, 'avg_consuming': avg_consuming, }) print(f" {ws}~{we}: 付费{n_paid} | 课消{weekly_consumption} | " f"有课消{n_consuming} | 人均{avg_all:.2f} | 有消人均{avg_consuming:.2f}") # ===== 输出结果 ===== print("\n" + "="*80) print(f"{'周':<20} {'付费用户':>8} {'课消次数':>8} {'有消用户':>8} {'人均课消':>8} {'有消人均':>8}") print("-"*80) for r in results: print(f"{r['week']:<20} {r['paid_users']:>8} {r['consumption']:>8} " f"{r['consuming_users']:>8} {r['avg_all']:>8.2f} {r['avg_consuming']:>8.2f}") # 月度汇总 print("\n" + "="*80) print("月度汇总") print("-"*80) months_data = defaultdict(lambda: {'paid': 0, 'cons': 0, 'cons_users': set(), 'weeks': 0}) for r in results: m = r['ws'].strftime('%Y-%m') # 月度取最后一周的付费用户数(月末快照) months_data[m]['paid'] = r['paid_users'] # 取月末快照 months_data[m]['cons'] += r['consumption'] months_data[m]['cons_users'].update() # 需要按周累加课消用户 months_data[m]['weeks'] += 1 # 重新按月汇总(用月末周的付费用户,累加课消) monthly = defaultdict(lambda: {'paid': 0, 'cons': 0, 'cons_user_set': set(), 'last_paid': 0}) for r in results: m = r['ws'].strftime('%Y-%m') monthly[m]['cons'] += r['consumption'] monthly[m]['last_paid'] = r['paid_users'] # 有课消用户去重 # 需要在循环外处理(这里简化:直接取有课消用户快照) monthly[m]['cons_user_count'] = r['consuming_users'] # 这不对 # 简化月度:取月内所有周的最大付费用户数(月末),累加课消 print(f"{'月份':<10} {'月末付费':>8} {'月课消':>8} {'月人均':>8}") print("-"*50) for m in sorted(monthly): d = monthly[m] avg = d['cons'] / d['last_paid'] if d['last_paid'] > 0 else 0 print(f"{m:<10} {d['last_paid']:>8} {d['cons']:>8} {avg:>8.2f}") cur.close() conn.close() print("\n完成!")