ai_member_xiaoxi/scripts/course_consumption_weekly.py
2026-05-14 08:00:01 +08:00

242 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
课消指标:按周统计 2025-09-01 ~ 2026-05-10
指标:
1. 人均课消数 = 课消总次数 / 付费用户数
2. 有课消用户的人均课消数 = 课消总次数 / 有课消的付费用户数
"""
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta, date
# 连接线上 PostgreSQL
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
port=28591,
user="ai_member",
password="LdfjdjL83h3h3^$&**YGG*",
dbname="vala_bi"
)
cur = conn.cursor()
# ===== 时间参数 =====
overall_start = date(2025, 9, 1)
overall_end = date(2026, 5, 11) # exclusive即5/10是最后一天
print(f"统计区间: {overall_start} ~ {overall_end - timedelta(days=1)}")
# ===== 生成周列表(周一~周日) =====
weeks = []
d = overall_start
while d < overall_end:
week_start = d
# 找周日
days_to_sunday = 6 - d.weekday()
week_end = d + timedelta(days=days_to_sunday)
if week_end >= overall_end:
week_end = overall_end - timedelta(days=1)
weeks.append((week_start, week_end))
d = week_end + timedelta(days=1)
print(f"{len(weeks)}")
# ===== Step 1获取所有订单确定每用户的付费有效期 =====
print("\nStep 1: 查询订单...")
cur.execute("""
SELECT o.account_id, o.trade_no, o.out_trade_no, o.pay_success_date,
o.order_status, o.pay_amount_int
FROM bi_vala_order o
INNER JOIN bi_vala_app_account a ON o.account_id = a.id
WHERE a.status = 1
AND a.deleted_at IS NULL
AND o.pay_success_date IS NOT NULL
AND o.pay_success_date >= '2025-01-01'
""")
orders = cur.fetchall()
print(f" 订单数: {len(orders)}")
# 获取退费信息
cur.execute("""
SELECT trade_no, out_trade_no, status
FROM bi_refund_order
WHERE status = 3
""")
refunds = set()
refunds_by_out = set()
for trade_no, out_trade_no, st in cur.fetchall():
if trade_no:
refunds.add(trade_no)
if out_trade_no:
refunds_by_out.add(out_trade_no)
print(f" 退费trade_no数: {len(refunds)}, out_trade_no数: {len(refunds_by_out)}")
# 组织订单数据:{account_id: [(pay_date, is_refunded), ...]}
account_orders = defaultdict(list)
for aid, trade_no, out_trade_no, pay_date, order_status, amount in orders:
is_refunded = (
order_status == 4 and (
(trade_no and trade_no in refunds) or
(out_trade_no and out_trade_no in refunds_by_out)
)
)
account_orders[aid].append((pay_date, is_refunded))
# 判断每个用户是否是付费用户(到某日期为止)
def is_paid_user(aid, as_of_date):
"""截至 as_of_date用户是否有未退费订单"""
has_paid = False
for pay_date, is_refunded in account_orders.get(aid, []):
if pay_date.date() <= as_of_date:
if not is_refunded:
has_paid = True
# 如果退了但还有其他未退订单,也算
# 重新计数:截至该日期,是否有任何未退费订单
unpaid_orders = sum(
1 for pd, ref in account_orders.get(aid, [])
if pd.date() <= as_of_date and not ref
)
return unpaid_orders > 0
# 序章 chapter_idL1 U00 + L2 U00需剔除
u0_chapters = {343, 344, 345, 346, 348, 55, 56, 57, 58, 59}
# ===== Step 2获取所有课消记录 =====
print("\nStep 2: 查询课消记录剔除序章U0...")
# (user_id, chapter_id) -> earliest_updated_at
consumption_map = {} # key=(user_id, chapter_id) -> earliest updated_at (date)
for table_idx in range(8):
tbl = f"bi_user_chapter_play_record_{table_idx}"
cur.execute(f"""
SELECT user_id, chapter_id, updated_at
FROM {tbl}
WHERE play_status = 1
AND updated_at >= '2025-09-01'
AND updated_at < '2026-05-11'
""")
cnt = 0
for user_id, chapter_id, updated_at in cur.fetchall():
if chapter_id in u0_chapters:
continue
key = (user_id, chapter_id)
d = updated_at.date() if hasattr(updated_at, 'date') else updated_at
if isinstance(updated_at, datetime):
d = updated_at.date()
elif isinstance(updated_at, str):
d = datetime.strptime(updated_at[:10], '%Y-%m-%d').date()
else:
d = updated_at
if key not in consumption_map or d < consumption_map[key]:
consumption_map[key] = d
cnt += 1
print(f" {tbl}: {cnt} 条记录")
print(f" 去重后课消: {len(consumption_map)}")
# ===== Step 3关联 character -> account =====
print("\nStep 3: 关联角色...")
all_user_ids = set(k[0] for k in consumption_map)
all_user_id_list = list(all_user_ids)
char_to_account = {}
batch_size = 500
for i in range(0, len(all_user_id_list), batch_size):
batch = all_user_id_list[i:i+batch_size]
ph = ','.join(['%s'] * len(batch))
cur.execute(f"""
SELECT id, account_id FROM bi_vala_app_character
WHERE id IN ({ph})
""", batch)
for cid, aid in cur.fetchall():
char_to_account[cid] = aid
print(f" 角色-账号映射: {len(char_to_account)}")
# ===== Step 4按周汇总 =====
print("\nStep 4: 按周汇总...")
results = []
for ws, we in weeks:
# 分母:截至该周末的付费用户
paid_users = set()
for aid in account_orders:
if is_paid_user(aid, we):
paid_users.add(aid)
# 分子该周内付费用户的课消次数按最早updated_at所在周
weekly_consumption = 0
consuming_accounts = set()
for (uid, ch_id), cons_date in consumption_map.items():
if ws <= cons_date <= we:
aid = char_to_account.get(uid)
if aid and aid in paid_users:
weekly_consumption += 1
consuming_accounts.add(aid)
n_paid = len(paid_users)
n_consuming = len(consuming_accounts)
avg_all = weekly_consumption / n_paid if n_paid > 0 else 0
avg_consuming = weekly_consumption / n_consuming if n_consuming > 0 else 0
results.append({
'week': f"{ws.strftime('%m/%d')}-{we.strftime('%m/%d')}",
'ws': ws,
'we': we,
'paid_users': n_paid,
'consumption': weekly_consumption,
'consuming_users': n_consuming,
'avg_all': avg_all,
'avg_consuming': avg_consuming,
})
print(f" {ws}~{we}: 付费{n_paid} | 课消{weekly_consumption} | "
f"有课消{n_consuming} | 人均{avg_all:.2f} | 有消人均{avg_consuming:.2f}")
# ===== 输出结果 =====
print("\n" + "="*80)
print(f"{'':<20} {'付费用户':>8} {'课消次数':>8} {'有消用户':>8} {'人均课消':>8} {'有消人均':>8}")
print("-"*80)
for r in results:
print(f"{r['week']:<20} {r['paid_users']:>8} {r['consumption']:>8} "
f"{r['consuming_users']:>8} {r['avg_all']:>8.2f} {r['avg_consuming']:>8.2f}")
# 月度汇总
print("\n" + "="*80)
print("月度汇总")
print("-"*80)
months_data = defaultdict(lambda: {'paid': 0, 'cons': 0, 'cons_users': set(), 'weeks': 0})
for r in results:
m = r['ws'].strftime('%Y-%m')
# 月度取最后一周的付费用户数(月末快照)
months_data[m]['paid'] = r['paid_users'] # 取月末快照
months_data[m]['cons'] += r['consumption']
months_data[m]['cons_users'].update() # 需要按周累加课消用户
months_data[m]['weeks'] += 1
# 重新按月汇总(用月末周的付费用户,累加课消)
monthly = defaultdict(lambda: {'paid': 0, 'cons': 0, 'cons_user_set': set(), 'last_paid': 0})
for r in results:
m = r['ws'].strftime('%Y-%m')
monthly[m]['cons'] += r['consumption']
monthly[m]['last_paid'] = r['paid_users']
# 有课消用户去重
# 需要在循环外处理(这里简化:直接取有课消用户快照)
monthly[m]['cons_user_count'] = r['consuming_users'] # 这不对
# 简化月度:取月内所有周的最大付费用户数(月末),累加课消
print(f"{'月份':<10} {'月末付费':>8} {'月课消':>8} {'月人均':>8}")
print("-"*50)
for m in sorted(monthly):
d = monthly[m]
avg = d['cons'] / d['last_paid'] if d['last_paid'] > 0 else 0
print(f"{m:<10} {d['last_paid']:>8} {d['cons']:>8} {avg:>8.2f}")
cur.close()
conn.close()
print("\n完成!")