#!/usr/bin/env python3 """ L1 留存数据分析 - 最近30天次日/7日/14日/30日留存率 - 最近30天付费用户活跃频次与单次时长 - L1各Unit流失率 """ import os import sys import psycopg2 from datetime import date, timedelta, datetime from collections import defaultdict # 数据库连接 PG_CONFIG = { 'host': 'bj-postgres-16pob4sg.sql.tencentcdb.com', 'port': 28591, 'user': 'ai_member', 'password': os.environ.get('PG_ONLINE_PASSWORD', ''), 'dbname': 'vala_bi' } # L1 (A1) chapter IDs by unit/lesson # S0 U00: 343(L01) 344(L02) 345(L03) 346(L04) 348(L05) # S1 U01-U12 L1_CHAPTERS = { 'U00': {'L01': 343, 'L02': 344, 'L03': 345, 'L04': 346, 'L05': 348}, 'U01': {'L01': 333, 'L02': 334, 'L03': 335, 'L04': 336, 'L05': 337}, 'U02': {'L01': 338, 'L02': 339, 'L03': 340, 'L04': 341, 'L05': 342}, 'U03': {'L01': 349, 'L02': 350, 'L03': 351, 'L04': 352, 'L05': 353}, 'U04': {'L01': 354, 'L02': 355, 'L03': 356, 'L04': 357, 'L05': 358}, 'U05': {'L01': 359, 'L02': 360, 'L03': 361, 'L04': 362, 'L05': 363}, 'U06': {'L01': 366, 'L02': 367, 'L03': 368, 'L04': 369, 'L05': 370}, 'U07': {'L01': 371, 'L02': 372, 'L03': 373, 'L04': 374, 'L05': 375}, 'U08': {'L01': 376, 'L02': 377, 'L03': 378, 'L04': 379, 'L05': 380}, 'U09': {'L01': 381, 'L02': 382, 'L03': 383, 'L04': 384, 'L05': 385}, 'U10': {'L01': 386, 'L02': 387, 'L03': 388, 'L04': 389, 'L05': 390}, 'U11': {'L01': 391, 'L02': 392, 'L03': 393, 'L04': 394, 'L05': 395}, 'U12': {'L01': 396, 'L02': 397, 'L03': 398, 'L04': 399, 'L05': 400}, } ALL_L1_CHAPTER_IDS = [] for unit, lessons in L1_CHAPTERS.items(): ALL_L1_CHAPTER_IDS.extend(lessons.values()) # For UNION ALL queries, need chapter IDs as string CHAPTER_IDS_STR = ','.join(str(c) for c in ALL_L1_CHAPTER_IDS) # 分表数量 SHARD_COUNT = 8 # 今天 TODAY = date.today() # 30天前 DAYS_30_AGO = TODAY - timedelta(days=30) def get_conn(): return psycopg2.connect(**PG_CONFIG) def build_union_all(table_prefix, select_clause, where_clause='', shard_count=SHARD_COUNT): """Build UNION ALL across shard tables""" parts = [] for i in range(shard_count): parts.append(f""" SELECT {select_clause} FROM {table_prefix}_{i} {where_clause} """) return ' UNION ALL '.join(parts) def run_query(sql, conn): cur = conn.cursor() cur.execute(sql) rows = cur.fetchall() cur.close() return rows def analyze_retention(conn): """分析L1用户留存率""" print("\n" + "="*80) print("📊 一、L1 用户留存率分析") print("="*80) print(f"统计周期: {DAYS_30_AGO} ~ {TODAY}") print() # Step 1: 获取每个L1用户的首次学习日期 # 通过 bi_user_course_detail 获取 L1 用户,然后从播放记录中找首次学习日期 union_sql = build_union_all( 'bi_user_chapter_play_record', 'user_id, chapter_id, to_char(created_at, \'YYYY-MM-DD\') as play_date', f'WHERE chapter_id IN ({CHAPTER_IDS_STR})' ) sql = f""" WITH l1_users AS ( SELECT DISTINCT d.user_id, d.account_id FROM bi_user_course_detail d WHERE d.course_level = 'A1' AND d.deleted_at IS NULL ), all_plays AS ( {union_sql} ), first_play AS ( SELECT l.user_id, l.account_id, MIN(a.play_date) as first_date FROM l1_users l JOIN all_plays a ON l.user_id = a.user_id GROUP BY l.user_id, l.account_id ) SELECT first_date, user_id, account_id FROM first_play WHERE first_date >= '{DAYS_30_AGO - timedelta(days=30)}' -- 往前多取30天给30日留存 ORDER BY first_date; """ print("查询 L1 用户首次学习日期...") rows = run_query(sql, conn) print(f"共 {len(rows)} 个 L1 用户有首次学习记录") if not rows: print("无数据,跳过留存分析") return # 组织数据:first_date -> set of user_ids cohort_users = defaultdict(set) all_user_ids = set() for first_date_str, user_id, account_id in rows: # first_date_str 可能包含时间部分 cohort_date = first_date_str[:10] if first_date_str else '' if cohort_date: cohort_users[cohort_date].add(user_id) all_user_ids.add(int(user_id)) # Step 2: 获取这些用户的所有活跃日期 if all_user_ids: user_ids_str = ','.join(str(uid) for uid in all_user_ids) else: user_ids_str = '-1' activity_union = build_union_all( 'bi_user_chapter_play_record', 'user_id, to_char(created_at, \'YYYY-MM-DD\') as play_date', f'WHERE chapter_id IN ({CHAPTER_IDS_STR}) AND user_id IN ({user_ids_str})' ) sql2 = f""" SELECT DISTINCT user_id, play_date FROM ( {activity_union} ) t ORDER BY user_id, play_date; """ print("查询用户活跃日期...") activity_rows = run_query(sql2, conn) print(f"共 {len(activity_rows)} 条活跃记录") # 组织:user_id -> set of active_dates user_active_dates = defaultdict(set) for user_id, play_date in activity_rows: date_str = play_date[:10] if play_date else '' if date_str: user_active_dates[int(user_id)].add(date_str) # Step 3: 计算每个 cohort 的留存率 results = [] for cohort_date_str in sorted(cohort_users.keys()): cohort_date = date.fromisoformat(cohort_date_str) if cohort_date < DAYS_30_AGO - timedelta(days=30): continue users = cohort_users[cohort_date_str] total = len(users) # 次日留存 (D+1) d1 = (cohort_date + timedelta(days=1)).isoformat() d1_active = sum(1 for uid in users if d1 in user_active_dates.get(int(uid), set())) # 7日留存 (D+7) d7 = (cohort_date + timedelta(days=7)).isoformat() d7_active = sum(1 for uid in users if d7 in user_active_dates.get(int(uid), set())) # 14日留存 (D+14) d14 = (cohort_date + timedelta(days=14)).isoformat() d14_active = sum(1 for uid in users if d14 in user_active_dates.get(int(uid), set())) # 30日留存 (D+30) d30 = (cohort_date + timedelta(days=30)).isoformat() d30_active = sum(1 for uid in users if d30 in user_active_dates.get(int(uid), set())) # 只有当天<=今天才计算 d1_valid = (cohort_date + timedelta(days=1)) <= TODAY d7_valid = (cohort_date + timedelta(days=7)) <= TODAY d14_valid = (cohort_date + timedelta(days=14)) <= TODAY d30_valid = (cohort_date + timedelta(days=30)) <= TODAY results.append({ 'cohort_date': cohort_date_str, 'total': total, 'd1_rate': f"{d1_active/total*100:.1f}%" if d1_valid and total > 0 else 'N/A', 'd7_rate': f"{d7_active/total*100:.1f}%" if d7_valid and total > 0 else 'N/A', 'd14_rate': f"{d14_active/total*100:.1f}%" if d14_valid and total > 0 else 'N/A', 'd30_rate': f"{d30_active/total*100:.1f}%" if d30_valid and total > 0 else 'N/A', 'd1_num': f"{d1_active}/{total}" if d1_valid else 'N/A', 'd7_num': f"{d7_active}/{total}" if d7_valid else 'N/A', 'd14_num': f"{d14_active}/{total}" if d14_valid else 'N/A', 'd30_num': f"{d30_active}/{total}" if d30_valid else 'N/A', }) # 只显示最近30天 results_recent = [r for r in results if r['cohort_date'] >= DAYS_30_AGO.isoformat()] if results_recent: print(f"\n{'Cohort日期':<12} {'新用户数':>8} {'次日留存':>10} {'7日留存':>10} {'14日留存':>10} {'30日留存':>10}") print("-" * 65) for r in results_recent: print(f"{r['cohort_date']:<12} {r['total']:>8} {r['d1_rate']:>10} {r['d7_rate']:>10} {r['d14_rate']:>10} {r['d30_rate']:>10}") # 汇总平均 avg_total = sum(r['total'] for r in results_recent) valid_d1 = [r for r in results_recent if r['d1_rate'] != 'N/A'] valid_d7 = [r for r in results_recent if r['d7_rate'] != 'N/A'] valid_d14 = [r for r in results_recent if r['d14_rate'] != 'N/A'] valid_d30 = [r for r in results_recent if r['d30_rate'] != 'N/A'] print(f"\n--- 汇总 ---") print(f"总新用户数: {avg_total}") if valid_d1: avg_d1 = sum(float(r['d1_rate'].replace('%','')) for r in valid_d1) / len(valid_d1) print(f"平均次日留存率: {avg_d1:.1f}% ({len(valid_d1)}个cohort)") if valid_d7: avg_d7 = sum(float(r['d7_rate'].replace('%','')) for r in valid_d7) / len(valid_d7) print(f"平均7日留存率: {avg_d7:.1f}% ({len(valid_d7)}个cohort)") if valid_d14: avg_d14 = sum(float(r['d14_rate'].replace('%','')) for r in valid_d14) / len(valid_d14) print(f"平均14日留存率: {avg_d14:.1f}% ({len(valid_d14)}个cohort)") if valid_d30: avg_d30 = sum(float(r['d30_rate'].replace('%','')) for r in valid_d30) / len(valid_d30) print(f"平均30日留存率: {avg_d30:.1f}% ({len(valid_d30)}个cohort)") else: print("最近30天无新用户数据") return results_recent def analyze_paid_user_activity(conn): """分析L1付费用户最近30天活跃频次与单次时长""" print("\n" + "="*80) print("📊 二、L1 付费用户活跃频次与单次时长") print("="*80) print(f"统计周期: {DAYS_30_AGO} ~ {TODAY}") # Step 1: 找出 L1 付费用户 sql_paid = f""" WITH l1_users AS ( SELECT DISTINCT d.user_id, d.account_id FROM bi_user_course_detail d WHERE d.course_level = 'A1' AND d.deleted_at IS NULL ), paid_users AS ( SELECT DISTINCT o.account_id FROM bi_vala_order o JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1 WHERE o.order_status IN (2, 3, 4) AND o.pay_amount_int > 0 ), l1_paid AS ( SELECT DISTINCT l.user_id, l.account_id FROM l1_users l JOIN paid_users p ON l.account_id = p.account_id ) SELECT user_id, account_id FROM l1_paid; """ print("查询 L1 付费用户...") paid_rows = run_query(sql_paid, conn) print(f"L1 付费用户数: {len(paid_rows)}") if not paid_rows: print("无 L1 付费用户数据") return paid_user_ids = set(int(r[0]) for r in paid_rows) paid_user_ids_str = ','.join(str(uid) for uid in paid_user_ids) # Step 2: 获取这些用户最近30天的学习记录(每个session) union_play = build_union_all( 'bi_user_chapter_play_record', 'user_id, chapter_id, chapter_unique_id, to_char(created_at, \'YYYY-MM-DD\') as play_date, created_at', f'WHERE chapter_id IN ({CHAPTER_IDS_STR}) AND user_id IN ({paid_user_ids_str}) AND created_at >= \'{DAYS_30_AGO}\'' ) sql_sessions = f""" SELECT user_id, play_date, chapter_unique_id, MIN(created_at) as session_start FROM ( {union_play} ) t GROUP BY user_id, play_date, chapter_unique_id ORDER BY user_id, play_date, chapter_unique_id; """ print("查询付费用户学习 session...") session_rows = run_query(sql_sessions, conn) print(f"共 {len(session_rows)} 个学习 session") if not session_rows: print("无学习记录") return # Step 3: 获取每个 session 的耗时 all_chapter_unique_ids = set(r[2] for r in session_rows) # 分批查询 component play record 获取耗时 # 由于 chapter_unique_id 可能很多,分批处理 id_batches = [] batch_size = 500 ids_list = list(all_chapter_unique_ids) for i in range(0, len(ids_list), batch_size): id_batches.append(ids_list[i:i+batch_size]) print(f"查询 session 耗时({len(id_batches)}批)...") session_duration = {} for batch_idx, batch in enumerate(id_batches): ids_str = "','".join(str(x) for x in batch) # Build UNION ALL with per-shard GROUP BY parts = [] for i in range(SHARD_COUNT): parts.append(f""" SELECT chapter_unique_id, SUM(interval_time) as total_ms FROM bi_user_component_play_record_{i} WHERE chapter_unique_id IN ('{ids_str}') GROUP BY chapter_unique_id """) union_comp = ' UNION ALL '.join(parts) sql_dur = f""" SELECT chapter_unique_id, SUM(total_ms) as session_ms FROM ({union_comp}) t GROUP BY chapter_unique_id; """ dur_rows = run_query(sql_dur, conn) for cu_id, ms in dur_rows: session_duration[cu_id] = float(ms or 0) / 60000.0 # 转换为分钟 # Step 4: 计算指标 # 活跃频次 = 每天平均 session 数 # 单次时长 = 平均每个 session 的耗时 user_daily_sessions = defaultdict(lambda: defaultdict(int)) # user_id -> date -> session_count user_daily_duration = defaultdict(lambda: defaultdict(float)) # user_id -> date -> total_duration for user_id, play_date, cu_id, session_start in session_rows: date_str = play_date[:10] if play_date else '' if date_str: user_daily_sessions[int(user_id)][date_str] += 1 user_daily_duration[int(user_id)][date_str] += session_duration.get(cu_id, 0) # 计算全局指标 all_daily_freq = [] # 每个用户每天的 session 数 all_session_dur = [] # 每个 session 的时长 for uid, dates in user_daily_sessions.items(): for d, cnt in dates.items(): all_daily_freq.append(cnt) for uid, dur in session_duration.items(): if dur > 0: all_session_dur.append(dur) total_users = len(user_daily_sessions) total_days = len(all_daily_freq) avg_daily_sessions = sum(all_daily_freq) / len(all_daily_freq) if all_daily_freq else 0 avg_session_duration = sum(all_session_dur) / len(all_session_dur) if all_session_dur else 0 median_session_duration = sorted(all_session_dur)[len(all_session_dur)//2] if all_session_dur else 0 total_sessions = sum(all_daily_freq) print(f"\n--- L1 付费用户活跃分析 ---") print(f"付费用户数: {total_users}") print(f"30天内活跃天数: {total_days}") print(f"30天内总 session 数: {total_sessions}") print(f"平均每天上线次数(活跃频次): {avg_daily_sessions:.2f} 次/天") print(f"平均单次时长: {avg_session_duration:.1f} 分钟") print(f"中位数单次时长: {median_session_duration:.1f} 分钟") # 分布分析 freq_dist = defaultdict(int) for f in all_daily_freq: if f <= 1: freq_dist['1次'] += 1 elif f <= 3: freq_dist['2-3次'] += 1 elif f <= 5: freq_dist['4-5次'] += 1 else: freq_dist['6次以上'] += 1 print(f"\n每日上线次数分布:") for k in ['1次', '2-3次', '4-5次', '6次以上']: cnt = freq_dist.get(k, 0) print(f" {k}: {cnt} 天 ({cnt/total_days*100:.1f}%)" if total_days > 0 else f" {k}: 0") return { 'total_users': total_users, 'total_days': total_days, 'total_sessions': total_sessions, 'avg_daily_sessions': avg_daily_sessions, 'avg_session_duration': avg_session_duration, 'median_session_duration': median_session_duration, } def analyze_unit_churn(conn): """分析 L1 各 Unit 流失率""" print("\n" + "="*80) print("📊 三、L1 各 Unit 流失率") print("="*80) print("流失率定义: 1 - (进入Unit(N+1)用户数 / 完成Unit(N-1)用户数)") print("进入 = 进入 Unit(N+1) 第一节课(L01)") print("完成 = 完成 Unit(N-1) 最后一节课(L05)") print() # 获取所有 L1 用户的 user_id sql_l1 = """ SELECT DISTINCT d.user_id FROM bi_user_course_detail d WHERE d.course_level = 'A1' AND d.deleted_at IS NULL; """ l1_rows = run_query(sql_l1, conn) l1_user_ids = set(int(r[0]) for r in l1_rows) l1_user_ids_str = ','.join(str(uid) for uid in l1_user_ids) # 收集所有需要查询的 chapter_id all_chapter_ids = set() for unit_name, lessons in L1_CHAPTERS.items(): for lesson_name, ch_id in lessons.items(): all_chapter_ids.add(ch_id) chapter_ids_str = ','.join(str(c) for c in all_chapter_ids) # 查询所有 L1 用户的课时播放记录 union_play = build_union_all( 'bi_user_chapter_play_record', 'user_id, chapter_id, play_status', f'WHERE chapter_id IN ({chapter_ids_str}) AND user_id IN ({l1_user_ids_str})' ) sql_plays = f""" SELECT user_id, chapter_id, play_status FROM ({union_play}) t; """ print("查询课时播放记录...") play_rows = run_query(sql_plays, conn) print(f"共 {len(play_rows)} 条记录") # 组织数据 # user_id -> set of chapters they've entered (any play_status) user_entered = defaultdict(set) # user_id -> set of chapters they've completed (play_status = 1) user_completed = defaultdict(set) for user_id, chapter_id, play_status in play_rows: user_entered[int(user_id)].add(int(chapter_id)) if play_status == 1: user_completed[int(user_id)].add(int(chapter_id)) # 计算每个 Unit 的流失率 # Unit N 流失率: 完成 Unit(N-1) L05 的用户中,没有进入 Unit(N+1) L01 的比例 # 流失率 = 1 - (进入 Unit(N+1)L01 且 完成 Unit(N-1)L05 的用户数 / 完成 Unit(N-1)L05 的用户数) print(f"\n{'Unit':<8} {'完成前Unit':<15} {'完成后Unit L05':>10} {'进入后Unit L01':>10} {'留存率':>10} {'流失率':>10}") print("-" * 70) unit_order = ['U00', 'U01', 'U02', 'U03', 'U04', 'U05', 'U06', 'U07', 'U08', 'U09', 'U10', 'U11', 'U12'] for i in range(1, len(unit_order) - 1): # U01 to U11 prev_unit = unit_order[i - 1] # Unit(N-1) curr_unit = unit_order[i] # Unit(N) next_unit = unit_order[i + 1] # Unit(N+1) prev_l05 = L1_CHAPTERS[prev_unit]['L05'] next_l01 = L1_CHAPTERS[next_unit]['L01'] # 完成 Unit(N-1) L05 的用户 completed_prev = set() for uid in l1_user_ids: if prev_l05 in user_completed.get(uid, set()): completed_prev.add(uid) # 进入 Unit(N+1) L01 的用户 entered_next = set() for uid in l1_user_ids: if next_l01 in user_entered.get(uid, set()): entered_next.add(uid) # 同时满足两个条件的用户(完成前Unit且进入后Unit) both = completed_prev & entered_next denom = len(completed_prev) num = len(both) if denom > 0: retention = num / denom * 100 churn = 100 - retention print(f"{curr_unit:<8} {prev_unit}(L05={prev_l05}) {denom:>10} {num:>10} {retention:>9.1f}% {churn:>9.1f}%") else: print(f"{curr_unit:<8} {prev_unit}(L05={prev_l05}) {denom:>10} {num:>10} {'N/A':>10} {'N/A':>10}") # 也显示 Unit 12 的流失(到 U13 不存在) # 可以算 U12 的完成率:完成 U12 L05 的用户/进入 U12 的用户 print() print("--- Unit 完成情况补充 ---") for unit_name in unit_order: l01 = L1_CHAPTERS[unit_name]['L01'] l05 = L1_CHAPTERS[unit_name]['L05'] entered = sum(1 for uid in l1_user_ids if l01 in user_entered.get(uid, set())) completed = sum(1 for uid in l1_user_ids if l05 in user_completed.get(uid, set())) if entered > 0: print(f"{unit_name}: 进入 {entered} 人, 完成 {completed} 人, 完成率 {completed/entered*100:.1f}%") def main(): if not PG_CONFIG['password']: # Try to read from secrets.env secrets_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'secrets.env') if os.path.exists(secrets_file): with open(secrets_file) as f: for line in f: if 'PG_ONLINE_PASSWORD' in line: PG_CONFIG['password'] = line.strip().split('=', 1)[1].strip('"').strip("'") break if not PG_CONFIG['password']: print("ERROR: PG_ONLINE_PASSWORD not found") sys.exit(1) conn = get_conn() try: analyze_retention(conn) analyze_paid_user_activity(conn) analyze_unit_churn(conn) finally: conn.close() if __name__ == '__main__': main()