ai_member_xiaoxi/scripts/l1_retention_analysis.py
2026-05-12 08:00:01 +08:00

539 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
L1 留存数据分析
- 最近30天次日/7日/14日/30日留存率
- 最近30天付费用户活跃频次与单次时长
- L1各Unit流失率
"""
import os
import sys
import psycopg2
from datetime import date, timedelta, datetime
from collections import defaultdict
# 数据库连接
PG_CONFIG = {
'host': 'bj-postgres-16pob4sg.sql.tencentcdb.com',
'port': 28591,
'user': 'ai_member',
'password': os.environ.get('PG_ONLINE_PASSWORD', ''),
'dbname': 'vala_bi'
}
# L1 (A1) chapter IDs by unit/lesson
# S0 U00: 343(L01) 344(L02) 345(L03) 346(L04) 348(L05)
# S1 U01-U12
L1_CHAPTERS = {
'U00': {'L01': 343, 'L02': 344, 'L03': 345, 'L04': 346, 'L05': 348},
'U01': {'L01': 333, 'L02': 334, 'L03': 335, 'L04': 336, 'L05': 337},
'U02': {'L01': 338, 'L02': 339, 'L03': 340, 'L04': 341, 'L05': 342},
'U03': {'L01': 349, 'L02': 350, 'L03': 351, 'L04': 352, 'L05': 353},
'U04': {'L01': 354, 'L02': 355, 'L03': 356, 'L04': 357, 'L05': 358},
'U05': {'L01': 359, 'L02': 360, 'L03': 361, 'L04': 362, 'L05': 363},
'U06': {'L01': 366, 'L02': 367, 'L03': 368, 'L04': 369, 'L05': 370},
'U07': {'L01': 371, 'L02': 372, 'L03': 373, 'L04': 374, 'L05': 375},
'U08': {'L01': 376, 'L02': 377, 'L03': 378, 'L04': 379, 'L05': 380},
'U09': {'L01': 381, 'L02': 382, 'L03': 383, 'L04': 384, 'L05': 385},
'U10': {'L01': 386, 'L02': 387, 'L03': 388, 'L04': 389, 'L05': 390},
'U11': {'L01': 391, 'L02': 392, 'L03': 393, 'L04': 394, 'L05': 395},
'U12': {'L01': 396, 'L02': 397, 'L03': 398, 'L04': 399, 'L05': 400},
}
ALL_L1_CHAPTER_IDS = []
for unit, lessons in L1_CHAPTERS.items():
ALL_L1_CHAPTER_IDS.extend(lessons.values())
# For UNION ALL queries, need chapter IDs as string
CHAPTER_IDS_STR = ','.join(str(c) for c in ALL_L1_CHAPTER_IDS)
# 分表数量
SHARD_COUNT = 8
# 今天
TODAY = date.today()
# 30天前
DAYS_30_AGO = TODAY - timedelta(days=30)
def get_conn():
return psycopg2.connect(**PG_CONFIG)
def build_union_all(table_prefix, select_clause, where_clause='', shard_count=SHARD_COUNT):
"""Build UNION ALL across shard tables"""
parts = []
for i in range(shard_count):
parts.append(f"""
SELECT {select_clause}
FROM {table_prefix}_{i}
{where_clause}
""")
return ' UNION ALL '.join(parts)
def run_query(sql, conn):
cur = conn.cursor()
cur.execute(sql)
rows = cur.fetchall()
cur.close()
return rows
def analyze_retention(conn):
"""分析L1用户留存率"""
print("\n" + "="*80)
print("📊 一、L1 用户留存率分析")
print("="*80)
print(f"统计周期: {DAYS_30_AGO} ~ {TODAY}")
print()
# Step 1: 获取每个L1用户的首次学习日期
# 通过 bi_user_course_detail 获取 L1 用户,然后从播放记录中找首次学习日期
union_sql = build_union_all(
'bi_user_chapter_play_record',
'user_id, chapter_id, to_char(created_at, \'YYYY-MM-DD\') as play_date',
f'WHERE chapter_id IN ({CHAPTER_IDS_STR})'
)
sql = f"""
WITH l1_users AS (
SELECT DISTINCT d.user_id, d.account_id
FROM bi_user_course_detail d
WHERE d.course_level = 'A1' AND d.deleted_at IS NULL
),
all_plays AS (
{union_sql}
),
first_play AS (
SELECT l.user_id, l.account_id, MIN(a.play_date) as first_date
FROM l1_users l
JOIN all_plays a ON l.user_id = a.user_id
GROUP BY l.user_id, l.account_id
)
SELECT first_date, user_id, account_id
FROM first_play
WHERE first_date >= '{DAYS_30_AGO - timedelta(days=30)}' -- 往前多取30天给30日留存
ORDER BY first_date;
"""
print("查询 L1 用户首次学习日期...")
rows = run_query(sql, conn)
print(f"{len(rows)} 个 L1 用户有首次学习记录")
if not rows:
print("无数据,跳过留存分析")
return
# 组织数据first_date -> set of user_ids
cohort_users = defaultdict(set)
all_user_ids = set()
for first_date_str, user_id, account_id in rows:
# first_date_str 可能包含时间部分
cohort_date = first_date_str[:10] if first_date_str else ''
if cohort_date:
cohort_users[cohort_date].add(user_id)
all_user_ids.add(int(user_id))
# Step 2: 获取这些用户的所有活跃日期
if all_user_ids:
user_ids_str = ','.join(str(uid) for uid in all_user_ids)
else:
user_ids_str = '-1'
activity_union = build_union_all(
'bi_user_chapter_play_record',
'user_id, to_char(created_at, \'YYYY-MM-DD\') as play_date',
f'WHERE chapter_id IN ({CHAPTER_IDS_STR}) AND user_id IN ({user_ids_str})'
)
sql2 = f"""
SELECT DISTINCT user_id, play_date
FROM (
{activity_union}
) t
ORDER BY user_id, play_date;
"""
print("查询用户活跃日期...")
activity_rows = run_query(sql2, conn)
print(f"{len(activity_rows)} 条活跃记录")
# 组织user_id -> set of active_dates
user_active_dates = defaultdict(set)
for user_id, play_date in activity_rows:
date_str = play_date[:10] if play_date else ''
if date_str:
user_active_dates[int(user_id)].add(date_str)
# Step 3: 计算每个 cohort 的留存率
results = []
for cohort_date_str in sorted(cohort_users.keys()):
cohort_date = date.fromisoformat(cohort_date_str)
if cohort_date < DAYS_30_AGO - timedelta(days=30):
continue
users = cohort_users[cohort_date_str]
total = len(users)
# 次日留存 (D+1)
d1 = (cohort_date + timedelta(days=1)).isoformat()
d1_active = sum(1 for uid in users if d1 in user_active_dates.get(int(uid), set()))
# 7日留存 (D+7)
d7 = (cohort_date + timedelta(days=7)).isoformat()
d7_active = sum(1 for uid in users if d7 in user_active_dates.get(int(uid), set()))
# 14日留存 (D+14)
d14 = (cohort_date + timedelta(days=14)).isoformat()
d14_active = sum(1 for uid in users if d14 in user_active_dates.get(int(uid), set()))
# 30日留存 (D+30)
d30 = (cohort_date + timedelta(days=30)).isoformat()
d30_active = sum(1 for uid in users if d30 in user_active_dates.get(int(uid), set()))
# 只有当天<=今天才计算
d1_valid = (cohort_date + timedelta(days=1)) <= TODAY
d7_valid = (cohort_date + timedelta(days=7)) <= TODAY
d14_valid = (cohort_date + timedelta(days=14)) <= TODAY
d30_valid = (cohort_date + timedelta(days=30)) <= TODAY
results.append({
'cohort_date': cohort_date_str,
'total': total,
'd1_rate': f"{d1_active/total*100:.1f}%" if d1_valid and total > 0 else 'N/A',
'd7_rate': f"{d7_active/total*100:.1f}%" if d7_valid and total > 0 else 'N/A',
'd14_rate': f"{d14_active/total*100:.1f}%" if d14_valid and total > 0 else 'N/A',
'd30_rate': f"{d30_active/total*100:.1f}%" if d30_valid and total > 0 else 'N/A',
'd1_num': f"{d1_active}/{total}" if d1_valid else 'N/A',
'd7_num': f"{d7_active}/{total}" if d7_valid else 'N/A',
'd14_num': f"{d14_active}/{total}" if d14_valid else 'N/A',
'd30_num': f"{d30_active}/{total}" if d30_valid else 'N/A',
})
# 只显示最近30天
results_recent = [r for r in results if r['cohort_date'] >= DAYS_30_AGO.isoformat()]
if results_recent:
print(f"\n{'Cohort日期':<12} {'新用户数':>8} {'次日留存':>10} {'7日留存':>10} {'14日留存':>10} {'30日留存':>10}")
print("-" * 65)
for r in results_recent:
print(f"{r['cohort_date']:<12} {r['total']:>8} {r['d1_rate']:>10} {r['d7_rate']:>10} {r['d14_rate']:>10} {r['d30_rate']:>10}")
# 汇总平均
avg_total = sum(r['total'] for r in results_recent)
valid_d1 = [r for r in results_recent if r['d1_rate'] != 'N/A']
valid_d7 = [r for r in results_recent if r['d7_rate'] != 'N/A']
valid_d14 = [r for r in results_recent if r['d14_rate'] != 'N/A']
valid_d30 = [r for r in results_recent if r['d30_rate'] != 'N/A']
print(f"\n--- 汇总 ---")
print(f"总新用户数: {avg_total}")
if valid_d1:
avg_d1 = sum(float(r['d1_rate'].replace('%','')) for r in valid_d1) / len(valid_d1)
print(f"平均次日留存率: {avg_d1:.1f}% ({len(valid_d1)}个cohort)")
if valid_d7:
avg_d7 = sum(float(r['d7_rate'].replace('%','')) for r in valid_d7) / len(valid_d7)
print(f"平均7日留存率: {avg_d7:.1f}% ({len(valid_d7)}个cohort)")
if valid_d14:
avg_d14 = sum(float(r['d14_rate'].replace('%','')) for r in valid_d14) / len(valid_d14)
print(f"平均14日留存率: {avg_d14:.1f}% ({len(valid_d14)}个cohort)")
if valid_d30:
avg_d30 = sum(float(r['d30_rate'].replace('%','')) for r in valid_d30) / len(valid_d30)
print(f"平均30日留存率: {avg_d30:.1f}% ({len(valid_d30)}个cohort)")
else:
print("最近30天无新用户数据")
return results_recent
def analyze_paid_user_activity(conn):
"""分析L1付费用户最近30天活跃频次与单次时长"""
print("\n" + "="*80)
print("📊 二、L1 付费用户活跃频次与单次时长")
print("="*80)
print(f"统计周期: {DAYS_30_AGO} ~ {TODAY}")
# Step 1: 找出 L1 付费用户
sql_paid = f"""
WITH l1_users AS (
SELECT DISTINCT d.user_id, d.account_id
FROM bi_user_course_detail d
WHERE d.course_level = 'A1' AND d.deleted_at IS NULL
),
paid_users AS (
SELECT DISTINCT o.account_id
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.order_status IN (2, 3, 4)
AND o.pay_amount_int > 0
),
l1_paid AS (
SELECT DISTINCT l.user_id, l.account_id
FROM l1_users l
JOIN paid_users p ON l.account_id = p.account_id
)
SELECT user_id, account_id FROM l1_paid;
"""
print("查询 L1 付费用户...")
paid_rows = run_query(sql_paid, conn)
print(f"L1 付费用户数: {len(paid_rows)}")
if not paid_rows:
print("无 L1 付费用户数据")
return
paid_user_ids = set(int(r[0]) for r in paid_rows)
paid_user_ids_str = ','.join(str(uid) for uid in paid_user_ids)
# Step 2: 获取这些用户最近30天的学习记录每个session
union_play = build_union_all(
'bi_user_chapter_play_record',
'user_id, chapter_id, chapter_unique_id, to_char(created_at, \'YYYY-MM-DD\') as play_date, created_at',
f'WHERE chapter_id IN ({CHAPTER_IDS_STR}) AND user_id IN ({paid_user_ids_str}) AND created_at >= \'{DAYS_30_AGO}\''
)
sql_sessions = f"""
SELECT user_id, play_date, chapter_unique_id, MIN(created_at) as session_start
FROM (
{union_play}
) t
GROUP BY user_id, play_date, chapter_unique_id
ORDER BY user_id, play_date, chapter_unique_id;
"""
print("查询付费用户学习 session...")
session_rows = run_query(sql_sessions, conn)
print(f"{len(session_rows)} 个学习 session")
if not session_rows:
print("无学习记录")
return
# Step 3: 获取每个 session 的耗时
all_chapter_unique_ids = set(r[2] for r in session_rows)
# 分批查询 component play record 获取耗时
# 由于 chapter_unique_id 可能很多,分批处理
id_batches = []
batch_size = 500
ids_list = list(all_chapter_unique_ids)
for i in range(0, len(ids_list), batch_size):
id_batches.append(ids_list[i:i+batch_size])
print(f"查询 session 耗时({len(id_batches)}批)...")
session_duration = {}
for batch_idx, batch in enumerate(id_batches):
ids_str = "','".join(str(x) for x in batch)
# Build UNION ALL with per-shard GROUP BY
parts = []
for i in range(SHARD_COUNT):
parts.append(f"""
SELECT chapter_unique_id, SUM(interval_time) as total_ms
FROM bi_user_component_play_record_{i}
WHERE chapter_unique_id IN ('{ids_str}')
GROUP BY chapter_unique_id
""")
union_comp = ' UNION ALL '.join(parts)
sql_dur = f"""
SELECT chapter_unique_id, SUM(total_ms) as session_ms
FROM ({union_comp}) t
GROUP BY chapter_unique_id;
"""
dur_rows = run_query(sql_dur, conn)
for cu_id, ms in dur_rows:
session_duration[cu_id] = float(ms or 0) / 60000.0 # 转换为分钟
# Step 4: 计算指标
# 活跃频次 = 每天平均 session 数
# 单次时长 = 平均每个 session 的耗时
user_daily_sessions = defaultdict(lambda: defaultdict(int)) # user_id -> date -> session_count
user_daily_duration = defaultdict(lambda: defaultdict(float)) # user_id -> date -> total_duration
for user_id, play_date, cu_id, session_start in session_rows:
date_str = play_date[:10] if play_date else ''
if date_str:
user_daily_sessions[int(user_id)][date_str] += 1
user_daily_duration[int(user_id)][date_str] += session_duration.get(cu_id, 0)
# 计算全局指标
all_daily_freq = [] # 每个用户每天的 session 数
all_session_dur = [] # 每个 session 的时长
for uid, dates in user_daily_sessions.items():
for d, cnt in dates.items():
all_daily_freq.append(cnt)
for uid, dur in session_duration.items():
if dur > 0:
all_session_dur.append(dur)
total_users = len(user_daily_sessions)
total_days = len(all_daily_freq)
avg_daily_sessions = sum(all_daily_freq) / len(all_daily_freq) if all_daily_freq else 0
avg_session_duration = sum(all_session_dur) / len(all_session_dur) if all_session_dur else 0
median_session_duration = sorted(all_session_dur)[len(all_session_dur)//2] if all_session_dur else 0
total_sessions = sum(all_daily_freq)
print(f"\n--- L1 付费用户活跃分析 ---")
print(f"付费用户数: {total_users}")
print(f"30天内活跃天数: {total_days}")
print(f"30天内总 session 数: {total_sessions}")
print(f"平均每天上线次数(活跃频次): {avg_daily_sessions:.2f} 次/天")
print(f"平均单次时长: {avg_session_duration:.1f} 分钟")
print(f"中位数单次时长: {median_session_duration:.1f} 分钟")
# 分布分析
freq_dist = defaultdict(int)
for f in all_daily_freq:
if f <= 1:
freq_dist['1次'] += 1
elif f <= 3:
freq_dist['2-3次'] += 1
elif f <= 5:
freq_dist['4-5次'] += 1
else:
freq_dist['6次以上'] += 1
print(f"\n每日上线次数分布:")
for k in ['1次', '2-3次', '4-5次', '6次以上']:
cnt = freq_dist.get(k, 0)
print(f" {k}: {cnt} 天 ({cnt/total_days*100:.1f}%)" if total_days > 0 else f" {k}: 0")
return {
'total_users': total_users,
'total_days': total_days,
'total_sessions': total_sessions,
'avg_daily_sessions': avg_daily_sessions,
'avg_session_duration': avg_session_duration,
'median_session_duration': median_session_duration,
}
def analyze_unit_churn(conn):
"""分析 L1 各 Unit 流失率"""
print("\n" + "="*80)
print("📊 三、L1 各 Unit 流失率")
print("="*80)
print("流失率定义: 1 - (进入Unit(N+1)用户数 / 完成Unit(N-1)用户数)")
print("进入 = 进入 Unit(N+1) 第一节课(L01)")
print("完成 = 完成 Unit(N-1) 最后一节课(L05)")
print()
# 获取所有 L1 用户的 user_id
sql_l1 = """
SELECT DISTINCT d.user_id
FROM bi_user_course_detail d
WHERE d.course_level = 'A1' AND d.deleted_at IS NULL;
"""
l1_rows = run_query(sql_l1, conn)
l1_user_ids = set(int(r[0]) for r in l1_rows)
l1_user_ids_str = ','.join(str(uid) for uid in l1_user_ids)
# 收集所有需要查询的 chapter_id
all_chapter_ids = set()
for unit_name, lessons in L1_CHAPTERS.items():
for lesson_name, ch_id in lessons.items():
all_chapter_ids.add(ch_id)
chapter_ids_str = ','.join(str(c) for c in all_chapter_ids)
# 查询所有 L1 用户的课时播放记录
union_play = build_union_all(
'bi_user_chapter_play_record',
'user_id, chapter_id, play_status',
f'WHERE chapter_id IN ({chapter_ids_str}) AND user_id IN ({l1_user_ids_str})'
)
sql_plays = f"""
SELECT user_id, chapter_id, play_status
FROM ({union_play}) t;
"""
print("查询课时播放记录...")
play_rows = run_query(sql_plays, conn)
print(f"{len(play_rows)} 条记录")
# 组织数据
# user_id -> set of chapters they've entered (any play_status)
user_entered = defaultdict(set)
# user_id -> set of chapters they've completed (play_status = 1)
user_completed = defaultdict(set)
for user_id, chapter_id, play_status in play_rows:
user_entered[int(user_id)].add(int(chapter_id))
if play_status == 1:
user_completed[int(user_id)].add(int(chapter_id))
# 计算每个 Unit 的流失率
# Unit N 流失率: 完成 Unit(N-1) L05 的用户中,没有进入 Unit(N+1) L01 的比例
# 流失率 = 1 - (进入 Unit(N+1)L01 且 完成 Unit(N-1)L05 的用户数 / 完成 Unit(N-1)L05 的用户数)
print(f"\n{'Unit':<8} {'完成前Unit':<15} {'完成后Unit L05':>10} {'进入后Unit L01':>10} {'留存率':>10} {'流失率':>10}")
print("-" * 70)
unit_order = ['U00', 'U01', 'U02', 'U03', 'U04', 'U05', 'U06', 'U07', 'U08', 'U09', 'U10', 'U11', 'U12']
for i in range(1, len(unit_order) - 1): # U01 to U11
prev_unit = unit_order[i - 1] # Unit(N-1)
curr_unit = unit_order[i] # Unit(N)
next_unit = unit_order[i + 1] # Unit(N+1)
prev_l05 = L1_CHAPTERS[prev_unit]['L05']
next_l01 = L1_CHAPTERS[next_unit]['L01']
# 完成 Unit(N-1) L05 的用户
completed_prev = set()
for uid in l1_user_ids:
if prev_l05 in user_completed.get(uid, set()):
completed_prev.add(uid)
# 进入 Unit(N+1) L01 的用户
entered_next = set()
for uid in l1_user_ids:
if next_l01 in user_entered.get(uid, set()):
entered_next.add(uid)
# 同时满足两个条件的用户完成前Unit且进入后Unit
both = completed_prev & entered_next
denom = len(completed_prev)
num = len(both)
if denom > 0:
retention = num / denom * 100
churn = 100 - retention
print(f"{curr_unit:<8} {prev_unit}(L05={prev_l05}) {denom:>10} {num:>10} {retention:>9.1f}% {churn:>9.1f}%")
else:
print(f"{curr_unit:<8} {prev_unit}(L05={prev_l05}) {denom:>10} {num:>10} {'N/A':>10} {'N/A':>10}")
# 也显示 Unit 12 的流失(到 U13 不存在)
# 可以算 U12 的完成率:完成 U12 L05 的用户/进入 U12 的用户
print()
print("--- Unit 完成情况补充 ---")
for unit_name in unit_order:
l01 = L1_CHAPTERS[unit_name]['L01']
l05 = L1_CHAPTERS[unit_name]['L05']
entered = sum(1 for uid in l1_user_ids if l01 in user_entered.get(uid, set()))
completed = sum(1 for uid in l1_user_ids if l05 in user_completed.get(uid, set()))
if entered > 0:
print(f"{unit_name}: 进入 {entered} 人, 完成 {completed} 人, 完成率 {completed/entered*100:.1f}%")
def main():
if not PG_CONFIG['password']:
# Try to read from secrets.env
secrets_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'secrets.env')
if os.path.exists(secrets_file):
with open(secrets_file) as f:
for line in f:
if 'PG_ONLINE_PASSWORD' in line:
PG_CONFIG['password'] = line.strip().split('=', 1)[1].strip('"').strip("'")
break
if not PG_CONFIG['password']:
print("ERROR: PG_ONLINE_PASSWORD not found")
sys.exit(1)
conn = get_conn()
try:
analyze_retention(conn)
analyze_paid_user_activity(conn)
analyze_unit_churn(conn)
finally:
conn.close()
if __name__ == '__main__':
main()