438 lines
14 KiB
Python
438 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
端内购买用户行为分析:注册日期 → 序章完课日期 → 购课日期 三者关系
|
||
"""
|
||
import psycopg2
|
||
from datetime import datetime, date
|
||
from collections import defaultdict, Counter
|
||
import os
|
||
|
||
PG_PASS = os.environ.get('PG_ONLINE_PASSWORD', "LdfjdjL83h3h3^$&**YGG*")
|
||
conn = psycopg2.connect(
|
||
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
|
||
port=28591,
|
||
user="ai_member",
|
||
password=PG_PASS,
|
||
database="vala_bi"
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
# U0 序章 chapter_id
|
||
L1_U0 = [343, 344, 345, 346, 348]
|
||
L2_U0 = [55, 56, 57, 58, 59]
|
||
|
||
print("=" * 80)
|
||
print("端内购买用户行为分析")
|
||
print("=" * 80)
|
||
|
||
# ========== Step 1: 端内付费用户 ==========
|
||
print("\n[Step 1] 查询端内付费用户...")
|
||
cur.execute("""
|
||
SELECT DISTINCT ON (o.account_id)
|
||
o.account_id,
|
||
o.goods_id,
|
||
o.pay_success_date as purchase_date,
|
||
CASE
|
||
WHEN o.goods_id IN (57, 60, 63) THEN 'L1'
|
||
WHEN o.goods_id IN (31, 32, 33, 54) THEN 'L2'
|
||
ELSE 'other'
|
||
END as buy_level
|
||
FROM bi_vala_order o
|
||
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
|
||
WHERE o.key_from = 'app-active-h5-0-0'
|
||
AND o.order_status IN (3, 4)
|
||
AND o.pay_success_date IS NOT NULL
|
||
ORDER BY o.account_id, o.pay_success_date
|
||
""")
|
||
endo_users = {}
|
||
for row in cur.fetchall():
|
||
account_id, goods_id, purchase_date, buy_level = row
|
||
if buy_level == 'other':
|
||
continue
|
||
endo_users[account_id] = {
|
||
'goods_id': goods_id,
|
||
'purchase_date': purchase_date,
|
||
'buy_level': buy_level
|
||
}
|
||
|
||
print(f" 端内付费用户数: {len(endo_users)}")
|
||
l1_users = {k: v for k, v in endo_users.items() if v['buy_level'] == 'L1'}
|
||
l2_users = {k: v for k, v in endo_users.items() if v['buy_level'] == 'L2'}
|
||
print(f" - L1 购买用户: {len(l1_users)}")
|
||
print(f" - L2 购买用户: {len(l2_users)}")
|
||
|
||
# ========== Step 2: 注册时间 ==========
|
||
print("\n[Step 2] 查询用户注册时间...")
|
||
all_ids = list(endo_users.keys())
|
||
batch_size = 500
|
||
register_map = {}
|
||
for i in range(0, len(all_ids), batch_size):
|
||
batch = all_ids[i:i+batch_size]
|
||
cur.execute(
|
||
"SELECT id, created_at FROM bi_vala_app_account WHERE id IN %s AND status = 1",
|
||
(tuple(batch),)
|
||
)
|
||
for row in cur.fetchall():
|
||
register_map[row[0]] = row[1]
|
||
|
||
print(f" 找到注册时间的用户: {len(register_map)}")
|
||
|
||
# ========== Step 3: account_id -> user_id (character) 映射 ==========
|
||
print("\n[Step 3] 查询用户角色映射...")
|
||
char_map = {} # account_id -> [user_id, ...]
|
||
for i in range(0, len(all_ids), batch_size):
|
||
batch = all_ids[i:i+batch_size]
|
||
cur.execute(
|
||
"SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN %s",
|
||
(tuple(batch),)
|
||
)
|
||
for row in cur.fetchall():
|
||
acct = row[0]
|
||
uid = row[1]
|
||
if acct not in char_map:
|
||
char_map[acct] = []
|
||
char_map[acct].append(uid)
|
||
|
||
print(f" 有角色的用户数: {len(char_map)}")
|
||
|
||
# ========== Step 4: 序章完课时间 ==========
|
||
print("\n[Step 4] 查询序章完课时间(8张分表)...")
|
||
|
||
# 收集所有 user_id,按等级分
|
||
l1_user_ids = set()
|
||
l2_user_ids = set()
|
||
for acct in l1_users:
|
||
if acct in char_map:
|
||
for uid in char_map[acct]:
|
||
l1_user_ids.add(uid)
|
||
for acct in l2_users:
|
||
if acct in char_map:
|
||
for uid in char_map[acct]:
|
||
l2_user_ids.add(uid)
|
||
|
||
print(f" L1 相关 user_id 数: {len(l1_user_ids)}")
|
||
print(f" L2 相关 user_id 数: {len(l2_user_ids)}")
|
||
|
||
def query_prologue_completion(cur, user_ids, chapter_ids, table_prefix="bi_user_chapter_play_record"):
|
||
"""查询序章完课最早时间,返回 {user_id: earliest_date}"""
|
||
result = {}
|
||
if not user_ids:
|
||
return result
|
||
|
||
# 分表查询
|
||
union_parts = []
|
||
for t in range(8):
|
||
union_parts.append(f"""
|
||
SELECT user_id, MIN(created_at) as first_done
|
||
FROM {table_prefix}_{t}
|
||
WHERE chapter_id IN ({','.join(map(str, chapter_ids))})
|
||
AND play_status = 1
|
||
AND user_id IN %(user_ids)s
|
||
GROUP BY user_id
|
||
""")
|
||
|
||
sql = " UNION ALL ".join(union_parts)
|
||
sql = f"""
|
||
SELECT user_id, MIN(first_done) as earliest
|
||
FROM ({sql}) sub
|
||
GROUP BY user_id
|
||
"""
|
||
|
||
batch_size_uid = 2000
|
||
uid_list = list(user_ids)
|
||
for i in range(0, len(uid_list), batch_size_uid):
|
||
batch = tuple(uid_list[i:i+batch_size_uid])
|
||
cur.execute(sql, {'user_ids': batch})
|
||
for row in cur.fetchall():
|
||
result[row[0]] = row[1]
|
||
|
||
return result
|
||
|
||
# L1 序章
|
||
l1_prologue = query_prologue_completion(cur, l1_user_ids, L1_U0)
|
||
print(f" L1 序章有完课记录的 user_id: {len(l1_prologue)}")
|
||
|
||
# L2 序章
|
||
l2_prologue = query_prologue_completion(cur, l2_user_ids, L2_U0)
|
||
print(f" L2 序章有完课记录的 user_id: {len(l2_prologue)}")
|
||
|
||
# ========== Step 5: 关联分析 ==========
|
||
print("\n[Step 5] 关联分析...")
|
||
|
||
# 对每个端内购买用户,找到他的角色中对应等级序章最早完课时间
|
||
def get_earliest_prologue(account_id, buy_level):
|
||
"""获取用户对应等级序章的最早完课时间"""
|
||
if account_id not in char_map:
|
||
return None
|
||
user_ids = char_map[account_id]
|
||
|
||
earliest = None
|
||
prologue_map = l1_prologue if buy_level == 'L1' else l2_prologue
|
||
for uid in user_ids:
|
||
if uid in prologue_map:
|
||
d = prologue_map[uid]
|
||
if earliest is None or d < earliest:
|
||
earliest = d
|
||
return earliest
|
||
|
||
# 构建分析数据
|
||
records = []
|
||
for account_id, info in endo_users.items():
|
||
register_date = register_map.get(account_id)
|
||
purchase_date = info['purchase_date']
|
||
buy_level = info['buy_level']
|
||
|
||
if register_date is None:
|
||
continue
|
||
|
||
prologue_date = get_earliest_prologue(account_id, buy_level)
|
||
|
||
# 计算天数差
|
||
if isinstance(register_date, datetime):
|
||
reg_d = register_date.date()
|
||
else:
|
||
reg_d = register_date
|
||
|
||
if isinstance(purchase_date, datetime):
|
||
pur_d = purchase_date.date()
|
||
elif isinstance(purchase_date, date):
|
||
pur_d = purchase_date
|
||
else:
|
||
pur_d = purchase_date
|
||
|
||
if prologue_date:
|
||
if isinstance(prologue_date, datetime):
|
||
pro_d = prologue_date.date()
|
||
else:
|
||
pro_d = prologue_date
|
||
|
||
reg_to_pro = (pro_d - reg_d).days
|
||
pro_to_pur = (pur_d - pro_d).days
|
||
reg_to_pur = (pur_d - reg_d).days
|
||
has_prologue = True
|
||
else:
|
||
reg_to_pro = None
|
||
pro_to_pur = None
|
||
reg_to_pur = (pur_d - reg_d).days
|
||
has_prologue = False
|
||
|
||
records.append({
|
||
'account_id': account_id,
|
||
'level': buy_level,
|
||
'register_date': reg_d,
|
||
'prologue_date': prologue_date.date() if has_prologue and isinstance(prologue_date, datetime) else (prologue_date if has_prologue else None),
|
||
'purchase_date': pur_d,
|
||
'reg_to_pro': reg_to_pro,
|
||
'pro_to_pur': pro_to_pur,
|
||
'reg_to_pur': reg_to_pur,
|
||
'has_prologue': has_prologue
|
||
})
|
||
|
||
# ========== Step 6: 统计输出 ==========
|
||
print(f"\n总分析用户数: {len(records)}")
|
||
|
||
# --- 总体统计 ---
|
||
with_prologue = [r for r in records if r['has_prologue']]
|
||
without_prologue = [r for r in records if not r['has_prologue']]
|
||
|
||
print(f"\n{'='*80}")
|
||
print("总体概览")
|
||
print(f"{'='*80}")
|
||
print(f"端内付费用户: {len(records)}")
|
||
print(f" 有L1序章完课记录: {len([r for r in with_prologue if r['level']=='L1'])}")
|
||
print(f" 有L2序章完课记录: {len([r for r in with_prologue if r['level']=='L2'])}")
|
||
print(f" 无对应序章完课记录: {len(without_prologue)}")
|
||
|
||
# --- 天数分布 ---
|
||
def percentile(sorted_vals, p):
|
||
if not sorted_vals:
|
||
return None
|
||
idx = int(len(sorted_vals) * p / 100)
|
||
return sorted_vals[min(idx, len(sorted_vals)-1)]
|
||
|
||
def print_stats(name, vals, unit='天'):
|
||
if not vals:
|
||
print(f" {name}: 无数据")
|
||
return
|
||
s = sorted(vals)
|
||
print(f" {name}:")
|
||
print(f" 样本数: {len(s)}")
|
||
print(f" 中位数: {percentile(s, 50):.1f}{unit}")
|
||
print(f" 平均值: {sum(s)/len(s):.1f}{unit}")
|
||
print(f" P25: {percentile(s, 25):.1f}{unit}")
|
||
print(f" P75: {percentile(s, 75):.1f}{unit}")
|
||
print(f" P90: {percentile(s, 90):.1f}{unit}")
|
||
|
||
print(f"\n{'='*80}")
|
||
print("全量统计:注册→购课 天数")
|
||
print(f"{'='*80}")
|
||
print_stats("全量 注册→购课", [r['reg_to_pur'] for r in records])
|
||
|
||
print(f"\n{'='*80}")
|
||
print("有序章完课用户统计")
|
||
print(f"{'='*80}")
|
||
print_stats("序章完课 注册→完课 天数", [r['reg_to_pro'] for r in with_prologue])
|
||
print_stats("序章完课 完课→购课 天数", [r['pro_to_pur'] for r in with_prologue])
|
||
print_stats("序章完课 注册→购课 天数", [r['reg_to_pur'] for r in with_prologue])
|
||
|
||
# --- 按等级拆分 ---
|
||
for level in ['L1', 'L2']:
|
||
lvl = [r for r in records if r['level'] == level]
|
||
lvl_pro = [r for r in lvl if r['has_prologue']]
|
||
lvl_no = [r for r in lvl if not r['has_prologue']]
|
||
|
||
print(f"\n{'='*80}")
|
||
print(f"{level} 购买用户统计")
|
||
print(f"{'='*80}")
|
||
print(f" {level} 总用户: {len(lvl)}")
|
||
print(f" 有{level}序章完课: {len(lvl_pro)}")
|
||
print(f" 无{level}序章完课: {len(lvl_no)}")
|
||
|
||
print_stats(f"\n {level} 全量 注册→购课", [r['reg_to_pur'] for r in lvl])
|
||
|
||
if lvl_pro:
|
||
print_stats(f" {level} 序章完课 注册→完课", [r['reg_to_pro'] for r in lvl_pro])
|
||
print_stats(f" {level} 序章完课 完课→购课", [r['pro_to_pur'] for r in lvl_pro])
|
||
print_stats(f" {level} 序章完课 注册→购课", [r['reg_to_pur'] for r in lvl_pro])
|
||
|
||
# --- 完课→购课的时间段分布 ---
|
||
print(f"\n{'='*80}")
|
||
print("有完课用户:完课→购课 时间段分布")
|
||
print(f"{'='*80}")
|
||
|
||
buckets = {
|
||
'0天(当天购课)': (0, 0),
|
||
'1-3天': (1, 3),
|
||
'4-7天': (4, 7),
|
||
'8-14天': (8, 14),
|
||
'15-30天': (15, 30),
|
||
'31-60天': (31, 60),
|
||
'61-90天': (61, 90),
|
||
'90天以上': (91, 99999),
|
||
'负数(购课在完课前)': (-99999, -1),
|
||
}
|
||
|
||
for level in ['L1', 'L2', 'ALL']:
|
||
if level == 'ALL':
|
||
data = with_prologue
|
||
else:
|
||
data = [r for r in with_prologue if r['level'] == level]
|
||
|
||
if not data:
|
||
continue
|
||
|
||
vals = [r['pro_to_pur'] for r in data]
|
||
print(f"\n[{level}] 完课→购课 天数分布:")
|
||
for label, (lo, hi) in buckets.items():
|
||
cnt = sum(1 for v in vals if lo <= v <= hi)
|
||
pct = cnt / len(vals) * 100
|
||
bar = '█' * int(pct / 2)
|
||
print(f" {label:20s}: {cnt:4d} ({pct:5.1f}%) {bar}")
|
||
|
||
# --- 注册→完课时间段分布 ---
|
||
print(f"\n{'='*80}")
|
||
print("有完课用户:注册→完课 时间段分布")
|
||
print(f"{'='*80}")
|
||
|
||
for level in ['L1', 'L2', 'ALL']:
|
||
if level == 'ALL':
|
||
data = with_prologue
|
||
else:
|
||
data = [r for r in with_prologue if r['level'] == level]
|
||
|
||
if not data:
|
||
continue
|
||
|
||
vals = [r['reg_to_pro'] for r in data]
|
||
print(f"\n[{level}] 注册→完课 天数分布:")
|
||
for label, (lo, hi) in buckets.items():
|
||
if hi < 0:
|
||
continue
|
||
cnt = sum(1 for v in vals if lo <= v <= hi)
|
||
pct = cnt / len(vals) * 100
|
||
bar = '█' * int(pct / 2)
|
||
print(f" {label:20s}: {cnt:4d} ({pct:5.1f}%) {bar}")
|
||
|
||
# --- 注册→购课 时间段分布 ---
|
||
print(f"\n{'='*80}")
|
||
print("注册→购课 时间段分布")
|
||
print(f"{'='*80}")
|
||
|
||
for level in ['L1', 'L2', 'ALL']:
|
||
if level == 'ALL':
|
||
data = records
|
||
else:
|
||
data = [r for r in records if r['level'] == level]
|
||
|
||
if not data:
|
||
continue
|
||
|
||
vals = [r['reg_to_pur'] for r in data]
|
||
print(f"\n[{level}] 注册→购课 天数分布:")
|
||
extended_buckets = {
|
||
'0天(当天购课)': (0, 0),
|
||
'1天': (1, 1),
|
||
'2天': (2, 2),
|
||
'3天': (3, 3),
|
||
'4-7天': (4, 7),
|
||
'8-14天': (8, 14),
|
||
'15-30天': (15, 30),
|
||
'31-60天': (31, 60),
|
||
'61-90天': (61, 90),
|
||
'90-180天': (91, 180),
|
||
'180天以上': (181, 99999),
|
||
}
|
||
for label, (lo, hi) in extended_buckets.items():
|
||
cnt = sum(1 for v in vals if lo <= v <= hi)
|
||
pct = cnt / len(vals) * 100
|
||
bar = '█' * int(pct / 2)
|
||
print(f" {label:20s}: {cnt:4d} ({pct:5.1f}%) {bar}")
|
||
|
||
# --- 完课在购课之前 vs 之后 ---
|
||
print(f"\n{'='*80}")
|
||
print("完课 vs 购课 时间关系")
|
||
print(f"{'='*80}")
|
||
|
||
for level in ['L1', 'L2', 'ALL']:
|
||
if level == 'ALL':
|
||
data = with_prologue
|
||
else:
|
||
data = [r for r in with_prologue if r['level'] == level]
|
||
|
||
if not data:
|
||
continue
|
||
|
||
pro_before_pur = [r for r in data if r['pro_to_pur'] >= 0]
|
||
pro_after_pur = [r for r in data if r['pro_to_pur'] < 0]
|
||
pro_same_day = [r for r in data if r['pro_to_pur'] == 0]
|
||
|
||
print(f"\n[{level}]:")
|
||
print(f" 完课在购课之前或同一天: {len(pro_before_pur)} ({len(pro_before_pur)/len(data)*100:.1f}%)")
|
||
print(f" 其中同一天: {len(pro_same_day)} ({len(pro_same_day)/len(data)*100:.1f}%)")
|
||
print(f" 完课在购课之后: {len(pro_after_pur)} ({len(pro_after_pur)/len(data)*100:.1f}%)")
|
||
|
||
# ========== 导出详细CSV ==========
|
||
print(f"\n{'='*80}")
|
||
print("导出详细数据...")
|
||
print(f"{'='*80}")
|
||
|
||
import csv
|
||
output_path = '/root/.openclaw/workspace/output/endor_purchase_analysis.csv'
|
||
with open(output_path, 'w', newline='', encoding='utf-8-sig') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(['account_id', '等级', '注册日期', '序章完课日期', '购课日期',
|
||
'注册→完课(天)', '完课→购课(天)', '注册→购课(天)', '有序章完课'])
|
||
for r in sorted(records, key=lambda x: x['purchase_date']):
|
||
writer.writerow([
|
||
r['account_id'], r['level'], r['register_date'], r['prologue_date'], r['purchase_date'],
|
||
r['reg_to_pro'] if r['reg_to_pro'] is not None else '',
|
||
r['pro_to_pur'] if r['pro_to_pur'] is not None else '',
|
||
r['reg_to_pur'], '是' if r['has_prologue'] else '否'
|
||
])
|
||
|
||
print(f" 已导出到: {output_path}")
|
||
|
||
cur.close()
|
||
conn.close()
|
||
print("\n分析完成!")
|