#!/usr/bin/env python3 """ 课消指标:按 L1/L2 分等级统计 """ import psycopg2 from collections import defaultdict from datetime import date, timedelta, datetime conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password="LdfjdjL83h3h3^$&**YGG*", dbname="vala_bi" ) cur = conn.cursor() overall_start = date(2025, 9, 1) overall_end = date(2026, 5, 11) # 生成周列表 weeks = [] d = overall_start while d < overall_end: ws = d we = d + timedelta(days=6 - d.weekday()) if we >= overall_end: we = overall_end - timedelta(days=1) weeks.append((ws, we)) d = we + timedelta(days=1) # ===== 获取 L1/L2 chapter_id ===== u0_ids = {343, 344, 345, 346, 348, 55, 56, 57, 58, 59} cur.execute("SELECT DISTINCT id, course_level FROM bi_level_unit_lesson WHERE course_level IN ('L1','L2')") l1_chapters = set() l2_chapters = set() for cid, lv in cur.fetchall(): if cid in u0_ids: continue if lv == 'L1': l1_chapters.add(cid) elif lv == 'L2': l2_chapters.add(cid) print(f"L1 chapters: {len(l1_chapters)}, L2 chapters: {len(l2_chapters)}") # ===== Step 1: 付费用户 ===== print("Step 1: 查找付费用户...") cur.execute(""" SELECT DISTINCT o.account_id FROM bi_vala_order o INNER JOIN bi_vala_app_account a ON o.account_id = a.id WHERE a.status = 1 AND a.deleted_at IS NULL AND o.pay_success_date IS NOT NULL GROUP BY o.account_id HAVING COUNT(CASE WHEN o.order_status != 4 OR (o.order_status = 4 AND o.trade_no NOT IN ( SELECT trade_no FROM bi_refund_order WHERE status=3 )) THEN 1 END) > 0 """) paid_account_ids = [row[0] for row in cur.fetchall()] print(f" 付费用户: {len(paid_account_ids)}") # 订单详情用于动态判断每周付费用户 cur.execute(""" SELECT o.account_id, o.trade_no, o.out_trade_no, o.pay_success_date, o.order_status FROM bi_vala_order o INNER JOIN bi_vala_app_account a ON o.account_id = a.id WHERE a.status=1 AND a.deleted_at IS NULL AND o.pay_success_date IS NOT NULL AND o.pay_success_date >= '2025-01-01' """) orders = cur.fetchall() cur.execute("SELECT trade_no, status FROM bi_refund_order WHERE status=3") refund_set = {r[0] for r in cur.fetchall() if r[0]} account_orders = defaultdict(list) for aid, tn, otn, psd, os in orders: is_ref = os == 4 and tn in refund_set account_orders[aid].append((psd, is_ref)) def is_paid(aid, as_of): return sum(1 for pd, ref in account_orders.get(aid, []) if pd.date() <= as_of and not ref) > 0 # ===== Step 2: 课消(分L1/L2)===== print("Step 2: 查询课消(分L1/L2)...") l1_consumption = {} # (user_id, chapter_id) -> earliest date l2_consumption = {} for t in range(8): tbl = f"bi_user_chapter_play_record_{t}" cur.execute(f""" SELECT user_id, chapter_id, updated_at FROM {tbl} WHERE play_status=1 AND updated_at>='2025-09-01' AND updated_at<'2026-05-11' """) for uid, cid, upd in cur.fetchall(): if cid in l1_chapters: k, m = (uid, cid), l1_consumption elif cid in l2_chapters: k, m = (uid, cid), l2_consumption else: continue d = upd.date() if hasattr(upd, 'date') else upd if k not in m or d < m[k]: m[k] = d print(f" L1 课消(去重): {len(l1_consumption)}") print(f" L2 课消(去重): {len(l2_consumption)}") # ===== Step 3: 角色映射 ===== print("Step 3: 关联角色...") all_uids = set(k[0] for k in l1_consumption) | set(k[0] for k in l2_consumption) char_to_account = {} for i in range(0, len(all_uids), 500): batch = list(all_uids)[i:i+500] ph = ','.join(['%s']*len(batch)) cur.execute(f"SELECT id, account_id FROM bi_vala_app_character WHERE id IN ({ph})", batch) for cid, aid in cur.fetchall(): char_to_account[cid] = aid # ===== Step 4: 按周汇总 ===== print("Step 4: 按周汇总...") def weekly_stats(consumption_map): """返回每周的 (课消次数, 有消用户数)""" results = [] for ws, we in weeks: cons = 0 users = set() for (uid, ch_id), d in consumption_map.items(): if ws <= d <= we: cons += 1 aid = char_to_account.get(uid) if aid: users.add(aid) results.append((ws, we, cons, len(users))) return results l1_stats = weekly_stats(l1_consumption) l2_stats = weekly_stats(l2_consumption) # 汇总 + 付费用户 results = [] for i, (ws, we) in enumerate(weeks): paid = set(aid for aid in account_orders if is_paid(aid, we)) n_paid = len(paid) l1_cons, l1_users = l1_stats[i][2], l1_stats[i][3] l2_cons, l2_users = l2_stats[i][2], l2_stats[i][3] l1_avg = l1_cons / n_paid if n_paid else 0 l1_act_avg = l1_cons / l1_users if l1_users else 0 l2_avg = l2_cons / n_paid if n_paid else 0 l2_act_avg = l2_cons / l2_users if l2_users else 0 results.append({ 'week': f"{ws.strftime('%m/%d')}-{we.strftime('%m/%d')}", 'ws': ws, 'we': we, 'paid': n_paid, 'l1_cons': l1_cons, 'l1_users': l1_users, 'l1_avg': l1_avg, 'l1_act': l1_act_avg, 'l2_cons': l2_cons, 'l2_users': l2_users, 'l2_avg': l2_avg, 'l2_act': l2_act_avg, }) # 输出 print(f"\n{'周':<16} {'付费':>6} {'L1课消':>7} {'L1有消':>7} {'L1人均':>7} {'L1有消人均':>9} {'L2课消':>7} {'L2有消':>7} {'L2人均':>7} {'L2有消人均':>9}") for r in results: print(f"{r['week']:<16} {r['paid']:>6} {r['l1_cons']:>7} {r['l1_users']:>7} {r['l1_avg']:>7.2f} {r['l1_act']:>9.2f} {r['l2_cons']:>7} {r['l2_users']:>7} {r['l2_avg']:>7.2f} {r['l2_act']:>9.2f}") cur.close() conn.close() print("\n完成!")