ai_member_xiaoxi/scripts/course_consumption_v2.py
2026-05-15 08:00:01 +08:00

396 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
课消指标 v2剔除 U0 序章4张图按 L1/L2 拆分
"""
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta, date
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.chart import LineChart, BarChart, Reference
from openpyxl.chart.series import DataPoint
from openpyxl.chart.label import DataLabelList
from openpyxl.utils import get_column_letter
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
port=28591, user="ai_member",
password="LdfjdjL83h3h3^$&**YGG*", dbname="vala_bi"
)
cur = conn.cursor()
# ===== U0 chapter_ids to exclude =====
u0_chapters = {55, 56, 57, 58, 59, 343, 344, 345, 346, 348}
print(f"剔除 U0 序章: {sorted(u0_chapters)}")
# ===== 时间参数 =====
overall_start = date(2025, 9, 1)
overall_end = date(2026, 5, 11)
weeks = []
d = overall_start
while d < overall_end:
ws = d
days_to_sunday = 6 - d.weekday()
we = d + timedelta(days=days_to_sunday)
if we >= overall_end:
we = overall_end - timedelta(days=1)
weeks.append((ws, we))
d = we + timedelta(days=1)
# ===== Step 1: 用户分类 =====
print("\nStep 1: 分类付费用户...")
cur.execute("""
SELECT o.account_id, o.trade_no, o.order_status, o.pay_success_date,
CASE WHEN o.goods_id IN (57, 60, 63) THEN 'L1'
WHEN o.goods_id = 61 THEN 'L1+L2'
WHEN o.goods_id IN (31, 32, 33, 54) THEN 'L2'
ELSE '其他' END as level_type
FROM bi_vala_order o
INNER JOIN bi_vala_app_account a ON o.account_id = a.id
WHERE a.status = 1 AND a.deleted_at IS NULL AND o.pay_success_date IS NOT NULL
""")
orders = cur.fetchall()
cur.execute("SELECT trade_no FROM bi_refund_order WHERE status = 3")
refund_trades = set(r[0] for r in cur.fetchall())
user_data = defaultdict(lambda: {'levels': set(), 'orders': []})
for aid, trade_no, order_status, pay_date, lt in orders:
is_refunded = (order_status == 4 and trade_no in refund_trades)
user_data[aid]['levels'].add(lt)
user_data[aid]['orders'].append((pay_date.date(), is_refunded, lt))
def classify_user(levels):
has_l1, has_l2 = 'L1' in levels, 'L2' in levels
return 'L1+L2' if ('L1+L2' in levels or (has_l1 and has_l2)) else ('仅L1' if has_l1 else ('仅L2' if has_l2 else '其他'))
for aid in user_data:
user_data[aid]['category'] = classify_user(user_data[aid]['levels'])
def is_paid_as_of(aid, as_of_date):
return sum(1 for pd, ref, lt in user_data[aid]['orders'] if pd <= as_of_date and not ref) > 0
# ===== Step 2: 课消 (剔除 U0) =====
print("\nStep 2: 查询课消剔除U0...")
consumption_map = {}
u0_skipped = 0
for table_idx in range(8):
tbl = f"bi_user_chapter_play_record_{table_idx}"
cur.execute(f"""
SELECT user_id, chapter_id, updated_at
FROM {tbl}
WHERE play_status = 1 AND updated_at >= '2025-09-01' AND updated_at < '2026-05-11'
""")
for user_id, chapter_id, updated_at in cur.fetchall():
if chapter_id in u0_chapters:
u0_skipped += 1
continue
key = (user_id, chapter_id)
d = updated_at.date() if hasattr(updated_at, 'date') else datetime.strptime(str(updated_at)[:10], '%Y-%m-%d').date()
if key not in consumption_map or d < consumption_map[key]:
consumption_map[key] = d
print(f" 剔除U0课消: {u0_skipped} 条, 去重后: {len(consumption_map)}")
# ===== Step 3: 角色映射 =====
print("Step 3: 角色映射...")
all_uids = list(set(k[0] for k in consumption_map))
char2acct = {}
bs = 500
for i in range(0, len(all_uids), bs):
batch = all_uids[i:i+bs]
ph = ','.join(['%s'] * len(batch))
cur.execute(f"SELECT id, account_id FROM bi_vala_app_character WHERE id IN ({ph})", batch)
for cid, aid in cur.fetchall():
char2acct[cid] = aid
print(f" 映射: {len(char2acct)}")
# ===== Step 4: 按周汇总 =====
print("Step 4: 按周汇总...")
results = []
for ws, we in weeks:
paid_by_cat = defaultdict(set)
for aid in user_data:
if is_paid_as_of(aid, we):
paid_by_cat[user_data[aid]['category']].add(aid)
cons_by_cat = defaultdict(int)
cons_users_by_cat = defaultdict(set)
for (uid, ch_id), cons_date in consumption_map.items():
if ws <= cons_date <= we:
aid = char2acct.get(uid)
if aid:
cat = user_data.get(aid, {}).get('category', '其他')
if aid in paid_by_cat.get(cat, set()):
cons_by_cat[cat] += 1
cons_users_by_cat[cat].add(aid)
row = {'ws': ws, 'we': we}
for cat in ['仅L1', '仅L2', 'L1+L2', '其他', '合计']:
if cat == '合计':
n_paid = sum(len(v) for v in paid_by_cat.values())
n_cons = sum(cons_by_cat.values())
n_cons_users = len(set.union(*cons_users_by_cat.values())) if cons_users_by_cat else 0
else:
n_paid = len(paid_by_cat.get(cat, set()))
n_cons = cons_by_cat.get(cat, 0)
n_cons_users = len(cons_users_by_cat.get(cat, set()))
row[f'{cat}_paid'] = n_paid
row[f'{cat}_cons'] = n_cons
row[f'{cat}_cons_users'] = n_cons_users
row[f'{cat}_no_cons'] = n_paid - n_cons_users
row[f'{cat}_avg_all'] = round(n_cons / n_paid, 2) if n_paid > 0 else 0
row[f'{cat}_avg_cons'] = round(n_cons / n_cons_users, 2) if n_cons_users > 0 else 0
results.append(row)
cur.close()
conn.close()
# ===== 过滤: 仅保留有足够数据的周(付费人数>0=====
for cat in ['仅L1', '仅L2', 'L1+L2']:
# 找到第一个付费>0的周
first_idx = next((i for i, r in enumerate(results) if r[f'{cat}_paid'] > 0), 0)
print(f"{cat} 数据起于第 {first_idx+1} 周 ({results[first_idx]['ws']})")
# ===== 生成 Excel =====
print("\n生成 Excel...")
wb = openpyxl.Workbook()
wb.remove(wb.active)
# 样式
header_font = Font(name='微软雅黑', bold=True, size=9, color='FFFFFF')
header_fill = PatternFill(start_color='2F5496', end_color='2F5496', fill_type='solid')
data_font = Font(name='微软雅黑', size=9)
title_font = Font(name='微软雅黑', bold=True, size=14, color='2F5496')
subtitle_font = Font(name='微软雅黑', bold=True, size=11, color='2F5496')
border = Border(left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin'))
center = Alignment(horizontal='center', vertical='center')
l1_color = '4A90D9'
l2_color = 'E85D47'
l1l2_color = '7B9E4B'
def apply_cell(ws, row, col, value, font=data_font, fill=None, align=center, border_style=border):
c = ws.cell(row=row, column=col, value=value)
c.font, c.border, c.alignment = font, border_style, align
if fill: c.fill = fill
return c
def apply_header(ws, row, col, value):
c = ws.cell(row=row, column=col, value=value)
c.font, c.fill, c.border, c.alignment = header_font, header_fill, border, center
return c
# ===== Sheet 1: 概览 =====
ws1 = wb.create_sheet("概览")
ws1.merge_cells('A1:H1')
apply_cell(ws1, 1, 1, "付费用户 L1/L2 课消分析剔除U0序章", font=title_font, border_style=None, align=Alignment(horizontal='left'))
notes = [
"口径剔除L1/L2的U0序章课时L1 U00: 343-348, L2 U00: 55-59仅统计U1及之后的课消",
"课消用户首次完成某一课时付费用户status=1 + 未删除 + 有订单 + 未全部退款",
]
for i, n in enumerate(notes):
ws1.merge_cells(f'A{3+i}:H{3+i}')
apply_cell(ws1, 3+i, 1, n, font=Font(name='微软雅黑', size=9, color='666666'), border_style=None, align=Alignment(horizontal='left'))
# ===== Sheet 2: 每周明细 =====
ws2 = wb.create_sheet("每周明细")
headers_main = ['', '周一起', '周日'] + ['合计付费', '合计有消', '合计无消', '合计课消', '合计人均', '合计有消人均',
'仅L1付费', '仅L1有消', '仅L1无消', '仅L1课消', '仅L1人均', '仅L1有消人均',
'仅L2付费', '仅L2有消', '仅L2无消', '仅L2课消', '仅L2人均', '仅L2有消人均',
'L1+L2付费', 'L1+L2有消', 'L1+L2无消', 'L1+L2课消', 'L1+L2人均', 'L1+L2有消人均']
for j, h in enumerate(headers_main, 1):
apply_header(ws2, 1, j, h)
for ri, r in enumerate(results):
row = ri + 2
wl = f"{r['ws'].strftime('%m/%d')}-{r['we'].strftime('%m/%d')}"
apply_cell(ws2, row, 1, wl)
apply_cell(ws2, row, 2, r['ws'].strftime('%Y-%m-%d'))
apply_cell(ws2, row, 3, r['we'].strftime('%Y-%m-%d'))
col = 4
for prefix in ['合计', '仅L1', '仅L2', 'L1+L2']:
for metric in ['paid', 'cons_users', 'no_cons', 'cons', 'avg_all', 'avg_cons']:
val = r[f'{prefix}_{metric}']
apply_cell(ws2, row, col, val if isinstance(val, str) else val)
col += 1
for ci in range(1, len(headers_main)+1):
ws2.column_dimensions[get_column_letter(ci)].width = 11 if ci <= 3 else 10
ws2.freeze_panes = 'D2'
# ===== Sheet 3: L1 图表 =====
sheet_names = {'仅L1': ('L1图表', 'L1', l1_color, '4A90D9'), '仅L2': ('L2图表', 'L2', l2_color, 'E85D47')}
for cat, (sname, label, color, light_color) in sheet_names.items():
ws_chart_data = wb.create_sheet(sname)
# 只取该分类有付费用户的周
first_idx = next((i for i, r in enumerate(results) if r[f'{cat}_paid'] > 0), 0)
cat_results = results[first_idx:]
# Header
headers = ['', '付费用户', '有课消用户', '无课消用户', '课消总数', '人均课消', '有消人均']
for j, h in enumerate(headers, 1):
apply_header(ws_chart_data, 1, j, h)
for ri, r in enumerate(cat_results):
row = ri + 2
wl = f"{r['ws'].strftime('%m/%d')}"
apply_cell(ws_chart_data, row, 1, wl)
apply_cell(ws_chart_data, row, 2, r[f'{cat}_paid'])
apply_cell(ws_chart_data, row, 3, r[f'{cat}_cons_users'])
apply_cell(ws_chart_data, row, 4, r[f'{cat}_no_cons'])
apply_cell(ws_chart_data, row, 5, r[f'{cat}_cons'])
apply_cell(ws_chart_data, row, 6, r[f'{cat}_avg_all'])
apply_cell(ws_chart_data, row, 7, r[f'{cat}_avg_cons'])
n_rows = len(cat_results)
cats_ref = Reference(ws_chart_data, min_col=1, min_row=2, max_row=n_rows+1)
# --- Chart 1: 堆叠柱状图 (有课消/无课消) ---
chart1 = BarChart()
chart1.type = "col"
chart1.grouping = "stacked"
chart1.title = f"{label} 付费用户课消分布剔除U0序章"
chart1.style = 10
chart1.width = 24
chart1.height = 13
# 有课消用户
ref1 = Reference(ws_chart_data, min_col=3, min_row=1, max_row=n_rows+1)
chart1.add_data(ref1, titles_from_data=True)
chart1.set_categories(cats_ref)
chart1.series[0].graphicalProperties.solidFill = light_color
# 无课消用户
ref2 = Reference(ws_chart_data, min_col=4, min_row=1, max_row=n_rows+1)
chart1.add_data(ref2, titles_from_data=True)
chart1.series[1].graphicalProperties.solidFill = 'D9D9D9'
chart1.y_axis.title = '用户数'
chart1.legend.position = 'b'
ws_chart_data.add_chart(chart1, "A9")
# --- Chart 2: 折线图 (人均课消 + 有消人均) ---
chart2 = LineChart()
chart2.title = f"{label} 周人均课消趋势剔除U0序章"
chart2.style = 10
chart2.width = 24
chart2.height = 13
chart2.y_axis.title = '课消数(节/周)'
ref3 = Reference(ws_chart_data, min_col=6, min_row=1, max_row=n_rows+1)
chart2.add_data(ref3, titles_from_data=True)
chart2.set_categories(cats_ref)
chart2.series[0].graphicalProperties.line.solidFill = '999999'
chart2.series[0].graphicalProperties.line.width = 20000
chart2.series[0].graphicalProperties.line.dashStyle = 'dash'
ref4 = Reference(ws_chart_data, min_col=7, min_row=1, max_row=n_rows+1)
chart2.add_data(ref4, titles_from_data=True)
chart2.series[1].graphicalProperties.line.solidFill = color
chart2.series[1].graphicalProperties.line.width = 28000
chart2.y_axis.scaling.min = 0
chart2.legend.position = 'b'
ws_chart_data.add_chart(chart2, "A27")
# Column widths
for ci in range(1, 8):
ws_chart_data.column_dimensions[get_column_letter(ci)].width = 12
# ===== Sheet 4: L1+L2 图表(第三个分类)=====
ws_l1l2 = wb.create_sheet("L1+L2图表")
cat = 'L1+L2'
color = l1l2_color
light_color = 'A8C88E'
first_idx = next((i for i, r in enumerate(results) if r[f'{cat}_paid'] > 0), 0)
cat_results = results[first_idx:]
headers = ['', '付费用户', '有课消用户', '无课消用户', '课消总数', '人均课消', '有消人均']
for j, h in enumerate(headers, 1):
apply_header(ws_l1l2, 1, j, h)
n_rows = len(cat_results)
for ri, r in enumerate(cat_results):
row = ri + 2
wl = f"{r['ws'].strftime('%m/%d')}"
apply_cell(ws_l1l2, row, 1, wl)
apply_cell(ws_l1l2, row, 2, r[f'{cat}_paid'])
apply_cell(ws_l1l2, row, 3, r[f'{cat}_cons_users'])
apply_cell(ws_l1l2, row, 4, r[f'{cat}_no_cons'])
apply_cell(ws_l1l2, row, 5, r[f'{cat}_cons'])
apply_cell(ws_l1l2, row, 6, r[f'{cat}_avg_all'])
apply_cell(ws_l1l2, row, 7, r[f'{cat}_avg_cons'])
cats_ref = Reference(ws_l1l2, min_col=1, min_row=2, max_row=n_rows+1)
chart1 = BarChart()
chart1.type = "col"
chart1.grouping = "stacked"
chart1.title = f"L1+L2 付费用户课消分布剔除U0序章"
chart1.style = 10
chart1.width = 24
chart1.height = 13
ref1 = Reference(ws_l1l2, min_col=3, min_row=1, max_row=n_rows+1)
chart1.add_data(ref1, titles_from_data=True)
chart1.set_categories(cats_ref)
chart1.series[0].graphicalProperties.solidFill = light_color
ref2 = Reference(ws_l1l2, min_col=4, min_row=1, max_row=n_rows+1)
chart1.add_data(ref2, titles_from_data=True)
chart1.series[1].graphicalProperties.solidFill = 'D9D9D9'
chart1.y_axis.title = '用户数'
chart1.legend.position = 'b'
ws_l1l2.add_chart(chart1, "A9")
chart2 = LineChart()
chart2.title = f"L1+L2 周人均课消趋势剔除U0序章"
chart2.style = 10
chart2.width = 24
chart2.height = 13
chart2.y_axis.title = '课消数(节/周)'
ref3 = Reference(ws_l1l2, min_col=6, min_row=1, max_row=n_rows+1)
chart2.add_data(ref3, titles_from_data=True)
chart2.set_categories(cats_ref)
chart2.series[0].graphicalProperties.line.solidFill = '999999'
chart2.series[0].graphicalProperties.line.width = 20000
chart2.series[0].graphicalProperties.line.dashStyle = 'dash'
ref4 = Reference(ws_l1l2, min_col=7, min_row=1, max_row=n_rows+1)
chart2.add_data(ref4, titles_from_data=True)
chart2.series[1].graphicalProperties.line.solidFill = color
chart2.series[1].graphicalProperties.line.width = 28000
chart2.y_axis.scaling.min = 0
chart2.legend.position = 'b'
ws_l1l2.add_chart(chart2, "A27")
for ci in range(1, 8):
ws_l1l2.column_dimensions[get_column_letter(ci)].width = 12
# 保存
path = '/root/.openclaw/workspace/output/course_consumption_by_level_v2.xlsx'
wb.save(path)
print(f"\n✅ Excel v2 已保存: {path}")
# 简要摘要
last = results[-1]
print(f"""
=== 剔除U0后最终数据截至5/10 ===
仅L1: 付费{last['仅L1_paid']} 有消{last['仅L1_cons_users']} 无消{last['仅L1_no_cons']} 人均{last['仅L1_avg_all']} 有消人均{last['仅L1_avg_cons']}
仅L2: 付费{last['仅L2_paid']} 有消{last['仅L2_cons_users']} 无消{last['仅L2_no_cons']} 人均{last['仅L2_avg_all']} 有消人均{last['仅L2_avg_cons']}
L1+L2: 付费{last['L1+L2_paid']} 有消{last['L1+L2_cons_users']} 无消{last['L1+L2_no_cons']} 人均{last['L1+L2_avg_all']} 有消人均{last['L1+L2_avg_cons']}
合计: 付费{last['合计_paid']} 有消{last['合计_cons_users']} 无消{last['合计_no_cons']} 人均{last['合计_avg_all']} 有消人均{last['合计_avg_cons']}
""")