ai_member_xiaoxi/scripts/endor_purchase_analysis.py
2026-05-26 08:00:01 +08:00

438 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
端内购买用户行为分析:注册日期 → 序章完课日期 → 购课日期 三者关系
"""
import psycopg2
from datetime import datetime, date
from collections import defaultdict, Counter
import os
PG_PASS = os.environ.get('PG_ONLINE_PASSWORD', "LdfjdjL83h3h3^$&**YGG*")
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
port=28591,
user="ai_member",
password=PG_PASS,
database="vala_bi"
)
cur = conn.cursor()
# U0 序章 chapter_id
L1_U0 = [343, 344, 345, 346, 348]
L2_U0 = [55, 56, 57, 58, 59]
print("=" * 80)
print("端内购买用户行为分析")
print("=" * 80)
# ========== Step 1: 端内付费用户 ==========
print("\n[Step 1] 查询端内付费用户...")
cur.execute("""
SELECT DISTINCT ON (o.account_id)
o.account_id,
o.goods_id,
o.pay_success_date as purchase_date,
CASE
WHEN o.goods_id IN (57, 60, 63) THEN 'L1'
WHEN o.goods_id IN (31, 32, 33, 54) THEN 'L2'
ELSE 'other'
END as buy_level
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.key_from = 'app-active-h5-0-0'
AND o.order_status IN (3, 4)
AND o.pay_success_date IS NOT NULL
ORDER BY o.account_id, o.pay_success_date
""")
endo_users = {}
for row in cur.fetchall():
account_id, goods_id, purchase_date, buy_level = row
if buy_level == 'other':
continue
endo_users[account_id] = {
'goods_id': goods_id,
'purchase_date': purchase_date,
'buy_level': buy_level
}
print(f" 端内付费用户数: {len(endo_users)}")
l1_users = {k: v for k, v in endo_users.items() if v['buy_level'] == 'L1'}
l2_users = {k: v for k, v in endo_users.items() if v['buy_level'] == 'L2'}
print(f" - L1 购买用户: {len(l1_users)}")
print(f" - L2 购买用户: {len(l2_users)}")
# ========== Step 2: 注册时间 ==========
print("\n[Step 2] 查询用户注册时间...")
all_ids = list(endo_users.keys())
batch_size = 500
register_map = {}
for i in range(0, len(all_ids), batch_size):
batch = all_ids[i:i+batch_size]
cur.execute(
"SELECT id, created_at FROM bi_vala_app_account WHERE id IN %s AND status = 1",
(tuple(batch),)
)
for row in cur.fetchall():
register_map[row[0]] = row[1]
print(f" 找到注册时间的用户: {len(register_map)}")
# ========== Step 3: account_id -> user_id (character) 映射 ==========
print("\n[Step 3] 查询用户角色映射...")
char_map = {} # account_id -> [user_id, ...]
for i in range(0, len(all_ids), batch_size):
batch = all_ids[i:i+batch_size]
cur.execute(
"SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN %s",
(tuple(batch),)
)
for row in cur.fetchall():
acct = row[0]
uid = row[1]
if acct not in char_map:
char_map[acct] = []
char_map[acct].append(uid)
print(f" 有角色的用户数: {len(char_map)}")
# ========== Step 4: 序章完课时间 ==========
print("\n[Step 4] 查询序章完课时间8张分表...")
# 收集所有 user_id按等级分
l1_user_ids = set()
l2_user_ids = set()
for acct in l1_users:
if acct in char_map:
for uid in char_map[acct]:
l1_user_ids.add(uid)
for acct in l2_users:
if acct in char_map:
for uid in char_map[acct]:
l2_user_ids.add(uid)
print(f" L1 相关 user_id 数: {len(l1_user_ids)}")
print(f" L2 相关 user_id 数: {len(l2_user_ids)}")
def query_prologue_completion(cur, user_ids, chapter_ids, table_prefix="bi_user_chapter_play_record"):
"""查询序章完课最早时间,返回 {user_id: earliest_date}"""
result = {}
if not user_ids:
return result
# 分表查询
union_parts = []
for t in range(8):
union_parts.append(f"""
SELECT user_id, MIN(created_at) as first_done
FROM {table_prefix}_{t}
WHERE chapter_id IN ({','.join(map(str, chapter_ids))})
AND play_status = 1
AND user_id IN %(user_ids)s
GROUP BY user_id
""")
sql = " UNION ALL ".join(union_parts)
sql = f"""
SELECT user_id, MIN(first_done) as earliest
FROM ({sql}) sub
GROUP BY user_id
"""
batch_size_uid = 2000
uid_list = list(user_ids)
for i in range(0, len(uid_list), batch_size_uid):
batch = tuple(uid_list[i:i+batch_size_uid])
cur.execute(sql, {'user_ids': batch})
for row in cur.fetchall():
result[row[0]] = row[1]
return result
# L1 序章
l1_prologue = query_prologue_completion(cur, l1_user_ids, L1_U0)
print(f" L1 序章有完课记录的 user_id: {len(l1_prologue)}")
# L2 序章
l2_prologue = query_prologue_completion(cur, l2_user_ids, L2_U0)
print(f" L2 序章有完课记录的 user_id: {len(l2_prologue)}")
# ========== Step 5: 关联分析 ==========
print("\n[Step 5] 关联分析...")
# 对每个端内购买用户,找到他的角色中对应等级序章最早完课时间
def get_earliest_prologue(account_id, buy_level):
"""获取用户对应等级序章的最早完课时间"""
if account_id not in char_map:
return None
user_ids = char_map[account_id]
earliest = None
prologue_map = l1_prologue if buy_level == 'L1' else l2_prologue
for uid in user_ids:
if uid in prologue_map:
d = prologue_map[uid]
if earliest is None or d < earliest:
earliest = d
return earliest
# 构建分析数据
records = []
for account_id, info in endo_users.items():
register_date = register_map.get(account_id)
purchase_date = info['purchase_date']
buy_level = info['buy_level']
if register_date is None:
continue
prologue_date = get_earliest_prologue(account_id, buy_level)
# 计算天数差
if isinstance(register_date, datetime):
reg_d = register_date.date()
else:
reg_d = register_date
if isinstance(purchase_date, datetime):
pur_d = purchase_date.date()
elif isinstance(purchase_date, date):
pur_d = purchase_date
else:
pur_d = purchase_date
if prologue_date:
if isinstance(prologue_date, datetime):
pro_d = prologue_date.date()
else:
pro_d = prologue_date
reg_to_pro = (pro_d - reg_d).days
pro_to_pur = (pur_d - pro_d).days
reg_to_pur = (pur_d - reg_d).days
has_prologue = True
else:
reg_to_pro = None
pro_to_pur = None
reg_to_pur = (pur_d - reg_d).days
has_prologue = False
records.append({
'account_id': account_id,
'level': buy_level,
'register_date': reg_d,
'prologue_date': prologue_date.date() if has_prologue and isinstance(prologue_date, datetime) else (prologue_date if has_prologue else None),
'purchase_date': pur_d,
'reg_to_pro': reg_to_pro,
'pro_to_pur': pro_to_pur,
'reg_to_pur': reg_to_pur,
'has_prologue': has_prologue
})
# ========== Step 6: 统计输出 ==========
print(f"\n总分析用户数: {len(records)}")
# --- 总体统计 ---
with_prologue = [r for r in records if r['has_prologue']]
without_prologue = [r for r in records if not r['has_prologue']]
print(f"\n{'='*80}")
print("总体概览")
print(f"{'='*80}")
print(f"端内付费用户: {len(records)}")
print(f" 有L1序章完课记录: {len([r for r in with_prologue if r['level']=='L1'])}")
print(f" 有L2序章完课记录: {len([r for r in with_prologue if r['level']=='L2'])}")
print(f" 无对应序章完课记录: {len(without_prologue)}")
# --- 天数分布 ---
def percentile(sorted_vals, p):
if not sorted_vals:
return None
idx = int(len(sorted_vals) * p / 100)
return sorted_vals[min(idx, len(sorted_vals)-1)]
def print_stats(name, vals, unit=''):
if not vals:
print(f" {name}: 无数据")
return
s = sorted(vals)
print(f" {name}:")
print(f" 样本数: {len(s)}")
print(f" 中位数: {percentile(s, 50):.1f}{unit}")
print(f" 平均值: {sum(s)/len(s):.1f}{unit}")
print(f" P25: {percentile(s, 25):.1f}{unit}")
print(f" P75: {percentile(s, 75):.1f}{unit}")
print(f" P90: {percentile(s, 90):.1f}{unit}")
print(f"\n{'='*80}")
print("全量统计:注册→购课 天数")
print(f"{'='*80}")
print_stats("全量 注册→购课", [r['reg_to_pur'] for r in records])
print(f"\n{'='*80}")
print("有序章完课用户统计")
print(f"{'='*80}")
print_stats("序章完课 注册→完课 天数", [r['reg_to_pro'] for r in with_prologue])
print_stats("序章完课 完课→购课 天数", [r['pro_to_pur'] for r in with_prologue])
print_stats("序章完课 注册→购课 天数", [r['reg_to_pur'] for r in with_prologue])
# --- 按等级拆分 ---
for level in ['L1', 'L2']:
lvl = [r for r in records if r['level'] == level]
lvl_pro = [r for r in lvl if r['has_prologue']]
lvl_no = [r for r in lvl if not r['has_prologue']]
print(f"\n{'='*80}")
print(f"{level} 购买用户统计")
print(f"{'='*80}")
print(f" {level} 总用户: {len(lvl)}")
print(f"{level}序章完课: {len(lvl_pro)}")
print(f"{level}序章完课: {len(lvl_no)}")
print_stats(f"\n {level} 全量 注册→购课", [r['reg_to_pur'] for r in lvl])
if lvl_pro:
print_stats(f" {level} 序章完课 注册→完课", [r['reg_to_pro'] for r in lvl_pro])
print_stats(f" {level} 序章完课 完课→购课", [r['pro_to_pur'] for r in lvl_pro])
print_stats(f" {level} 序章完课 注册→购课", [r['reg_to_pur'] for r in lvl_pro])
# --- 完课→购课的时间段分布 ---
print(f"\n{'='*80}")
print("有完课用户:完课→购课 时间段分布")
print(f"{'='*80}")
buckets = {
'0天当天购课': (0, 0),
'1-3天': (1, 3),
'4-7天': (4, 7),
'8-14天': (8, 14),
'15-30天': (15, 30),
'31-60天': (31, 60),
'61-90天': (61, 90),
'90天以上': (91, 99999),
'负数(购课在完课前)': (-99999, -1),
}
for level in ['L1', 'L2', 'ALL']:
if level == 'ALL':
data = with_prologue
else:
data = [r for r in with_prologue if r['level'] == level]
if not data:
continue
vals = [r['pro_to_pur'] for r in data]
print(f"\n[{level}] 完课→购课 天数分布:")
for label, (lo, hi) in buckets.items():
cnt = sum(1 for v in vals if lo <= v <= hi)
pct = cnt / len(vals) * 100
bar = '' * int(pct / 2)
print(f" {label:20s}: {cnt:4d} ({pct:5.1f}%) {bar}")
# --- 注册→完课时间段分布 ---
print(f"\n{'='*80}")
print("有完课用户:注册→完课 时间段分布")
print(f"{'='*80}")
for level in ['L1', 'L2', 'ALL']:
if level == 'ALL':
data = with_prologue
else:
data = [r for r in with_prologue if r['level'] == level]
if not data:
continue
vals = [r['reg_to_pro'] for r in data]
print(f"\n[{level}] 注册→完课 天数分布:")
for label, (lo, hi) in buckets.items():
if hi < 0:
continue
cnt = sum(1 for v in vals if lo <= v <= hi)
pct = cnt / len(vals) * 100
bar = '' * int(pct / 2)
print(f" {label:20s}: {cnt:4d} ({pct:5.1f}%) {bar}")
# --- 注册→购课 时间段分布 ---
print(f"\n{'='*80}")
print("注册→购课 时间段分布")
print(f"{'='*80}")
for level in ['L1', 'L2', 'ALL']:
if level == 'ALL':
data = records
else:
data = [r for r in records if r['level'] == level]
if not data:
continue
vals = [r['reg_to_pur'] for r in data]
print(f"\n[{level}] 注册→购课 天数分布:")
extended_buckets = {
'0天当天购课': (0, 0),
'1天': (1, 1),
'2天': (2, 2),
'3天': (3, 3),
'4-7天': (4, 7),
'8-14天': (8, 14),
'15-30天': (15, 30),
'31-60天': (31, 60),
'61-90天': (61, 90),
'90-180天': (91, 180),
'180天以上': (181, 99999),
}
for label, (lo, hi) in extended_buckets.items():
cnt = sum(1 for v in vals if lo <= v <= hi)
pct = cnt / len(vals) * 100
bar = '' * int(pct / 2)
print(f" {label:20s}: {cnt:4d} ({pct:5.1f}%) {bar}")
# --- 完课在购课之前 vs 之后 ---
print(f"\n{'='*80}")
print("完课 vs 购课 时间关系")
print(f"{'='*80}")
for level in ['L1', 'L2', 'ALL']:
if level == 'ALL':
data = with_prologue
else:
data = [r for r in with_prologue if r['level'] == level]
if not data:
continue
pro_before_pur = [r for r in data if r['pro_to_pur'] >= 0]
pro_after_pur = [r for r in data if r['pro_to_pur'] < 0]
pro_same_day = [r for r in data if r['pro_to_pur'] == 0]
print(f"\n[{level}]:")
print(f" 完课在购课之前或同一天: {len(pro_before_pur)} ({len(pro_before_pur)/len(data)*100:.1f}%)")
print(f" 其中同一天: {len(pro_same_day)} ({len(pro_same_day)/len(data)*100:.1f}%)")
print(f" 完课在购课之后: {len(pro_after_pur)} ({len(pro_after_pur)/len(data)*100:.1f}%)")
# ========== 导出详细CSV ==========
print(f"\n{'='*80}")
print("导出详细数据...")
print(f"{'='*80}")
import csv
output_path = '/root/.openclaw/workspace/output/endor_purchase_analysis.csv'
with open(output_path, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.writer(f)
writer.writerow(['account_id', '等级', '注册日期', '序章完课日期', '购课日期',
'注册→完课(天)', '完课→购课(天)', '注册→购课(天)', '有序章完课'])
for r in sorted(records, key=lambda x: x['purchase_date']):
writer.writerow([
r['account_id'], r['level'], r['register_date'], r['prologue_date'], r['purchase_date'],
r['reg_to_pro'] if r['reg_to_pro'] is not None else '',
r['pro_to_pur'] if r['pro_to_pur'] is not None else '',
r['reg_to_pur'], '' if r['has_prologue'] else ''
])
print(f" 已导出到: {output_path}")
cur.close()
conn.close()
print("\n分析完成!")