#!/usr/bin/env python3 """ 生成 4 张课消图表(剔除U0序章): 1. L1 付费用户课消分布(堆叠柱状图) 2. L2 付费用户课消分布(堆叠柱状图) 3. L1 周人均课消趋势(折线图) 4. L2 周人均课消趋势(折线图) """ import psycopg2 from collections import defaultdict from datetime import datetime, timedelta, date import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import matplotlib.dates as mdates import matplotlib.ticker as ticker import numpy as np # 中文字体 import matplotlib.font_manager as fm font_path = '/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc' fm.fontManager.addfont(font_path) prop = fm.FontProperties(fname=font_path) font_name = prop.get_name() plt.rcParams['font.family'] = font_name plt.rcParams['axes.unicode_minus'] = False print(f'使用字体: {font_name}') conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password="LdfjdjL83h3h3^$&**YGG*", dbname="vala_bi" ) cur = conn.cursor() # ===== 配置 ===== u0_chapters = {55, 56, 57, 58, 59, 343, 344, 345, 346, 348} overall_start = date(2025, 9, 1) overall_end = date(2026, 5, 11) weeks = [] d = overall_start while d < overall_end: ws = d we = d + timedelta(days=6 - d.weekday()) if we >= overall_end: we = overall_end - timedelta(days=1) weeks.append((ws, we)) d = we + timedelta(days=1) # ===== 用户分类 ===== print("分类付费用户...") cur.execute(""" SELECT o.account_id, o.trade_no, o.order_status, o.pay_success_date, CASE WHEN o.goods_id IN (57, 60, 63) THEN 'L1' WHEN o.goods_id = 61 THEN 'L1+L2' WHEN o.goods_id IN (31, 32, 33, 54) THEN 'L2' ELSE '其他' END as level_type FROM bi_vala_order o INNER JOIN bi_vala_app_account a ON o.account_id = a.id WHERE a.status = 1 AND a.deleted_at IS NULL AND o.pay_success_date IS NOT NULL """) orders = cur.fetchall() cur.execute("SELECT trade_no FROM bi_refund_order WHERE status = 3") refund_trades = set(r[0] for r in cur.fetchall()) user_data = defaultdict(lambda: {'levels': set(), 'orders': []}) for aid, trade_no, order_status, pay_date, lt in orders: is_refunded = (order_status == 4 and trade_no in refund_trades) user_data[aid]['levels'].add(lt) user_data[aid]['orders'].append((pay_date.date(), is_refunded, lt)) def classify(levels): h1, h2 = 'L1' in levels, 'L2' in levels return 'L1+L2' if ('L1+L2' in levels or (h1 and h2)) else ('仅L1' if h1 else ('仅L2' if h2 else '其他')) for aid in user_data: user_data[aid]['category'] = classify(user_data[aid]['levels']) def is_paid(aid, as_of): return sum(1 for pd, ref, lt in user_data[aid]['orders'] if pd <= as_of and not ref) > 0 # ===== 课消 ===== print("查询课消...") cons_map = {} for table_idx in range(8): tbl = f"bi_user_chapter_play_record_{table_idx}" cur.execute(f""" SELECT user_id, chapter_id, updated_at FROM {tbl} WHERE play_status = 1 AND updated_at >= '2025-09-01' AND updated_at < '2026-05-11' """) for uid, cid, ua in cur.fetchall(): if cid in u0_chapters: continue key = (uid, cid) d = ua.date() if hasattr(ua, 'date') else datetime.strptime(str(ua)[:10], '%Y-%m-%d').date() if key not in cons_map or d < cons_map[key]: cons_map[key] = d # 角色映射 print("角色映射...") all_uids = list(set(k[0] for k in cons_map)) char2acct = {} bs = 500 for i in range(0, len(all_uids), bs): batch = all_uids[i:i+bs] ph = ','.join(['%s'] * len(batch)) cur.execute(f"SELECT id, account_id FROM bi_vala_app_character WHERE id IN ({ph})", batch) for cid, aid in cur.fetchall(): char2acct[cid] = aid # ===== 按周汇总 ===== print("按周汇总...") results = [] for ws, we in weeks: paid_by_cat = defaultdict(set) for aid in user_data: if is_paid(aid, we): paid_by_cat[user_data[aid]['category']].add(aid) cons_by_cat = defaultdict(int) cons_users_by_cat = defaultdict(set) for (uid, ch_id), cons_date in cons_map.items(): if ws <= cons_date <= we: aid = char2acct.get(uid) if aid: cat = user_data.get(aid, {}).get('category', '其他') if aid in paid_by_cat.get(cat, set()): cons_by_cat[cat] += 1 cons_users_by_cat[cat].add(aid) row = {'ws': ws, 'we': we} for cat in ['仅L1', '仅L2', 'L1+L2']: n_paid = len(paid_by_cat.get(cat, set())) n_cons = cons_by_cat.get(cat, 0) n_cons_users = len(cons_users_by_cat.get(cat, set())) row[f'{cat}_paid'] = n_paid row[f'{cat}_cons'] = n_cons row[f'{cat}_cons_users'] = n_cons_users row[f'{cat}_no_cons'] = n_paid - n_cons_users row[f'{cat}_avg_all'] = round(n_cons / n_paid, 2) if n_paid > 0 else 0 row[f'{cat}_avg_cons'] = round(n_cons / n_cons_users, 2) if n_cons_users > 0 else 0 results.append(row) cur.close() conn.close() # ===== 图表生成 ===== print("\n生成图表...") output_dir = '/root/.openclaw/workspace/output' configs = { 'L1': {'cat': '仅L1', 'color': '#4A90D9', 'light': '#A8CFF1', 'label': 'L1'}, 'L2': {'cat': '仅L2', 'color': '#E85D47', 'light': '#F4A9A0', 'label': 'L2'}, } for key, cfg in configs.items(): cat = cfg['cat'] color = cfg['color'] light = cfg['light'] label = cfg['label'] # 过滤无数据周 first = next(i for i, r in enumerate(results) if r[f'{cat}_paid'] > 0) data = results[first:] xs = [r['ws'] + timedelta(days=3) for r in data] labels = [r['ws'].strftime('%m/%d') for r in data] paid = [r[f'{cat}_paid'] for r in data] cons_users = [r[f'{cat}_cons_users'] for r in data] no_cons = [r[f'{cat}_no_cons'] for r in data] avg_all = [r[f'{cat}_avg_all'] for r in data] avg_cons = [r[f'{cat}_avg_cons'] for r in data] # --- 图1: 堆叠柱状图 --- fig, ax = plt.subplots(figsize=(18, 8)) x_idx = np.arange(len(xs)) bar_w = 0.65 p1 = ax.bar(x_idx, cons_users, bar_w, color=light, label='有课消用户', zorder=3) p2 = ax.bar(x_idx, no_cons, bar_w, bottom=cons_users, color='#D0D0D0', label='无课消用户', zorder=3) # 标注付费总数 for i, (p, c, n) in enumerate(zip(paid, cons_users, no_cons)): if i % max(1, len(data)//12) == 0: ax.annotate(str(p), (i, p), textcoords='offset points', xytext=(0, 6), fontsize=8, ha='center', color='#333333', fontweight='bold') ax.set_xticks(x_idx[::max(1, len(data)//12)]) ax.set_xticklabels([labels[i] for i in range(0, len(data), max(1, len(data)//12))], fontsize=9, rotation=45) ax.set_ylabel('用户数', fontsize=13) ax.set_title(f'{label} 付费用户周课消分布(剔除U0序章)', fontsize=16, fontweight='bold') ax.legend(fontsize=12, loc='upper left') ax.grid(axis='y', alpha=0.3, zorder=0) ax.set_xlim(-0.5, len(x_idx) - 0.5) # 无消率标注 no_rate = no_cons[-1] / paid[-1] * 100 if paid[-1] else 0 ax.text(0.97, 0.95, f'无课消率: {no_rate:.0f}%', transform=ax.transAxes, fontsize=11, ha='right', va='top', color='#999999', fontstyle='italic') plt.tight_layout() path1 = f'{output_dir}/{key}_users_stack.png' plt.savefig(path1, dpi=150, bbox_inches='tight', facecolor='white') plt.close() print(f' ✅ {path1}') # --- 图2: 折线图 --- fig, ax = plt.subplots(figsize=(18, 8)) ax.plot(xs, avg_all, 'o-', color='#999999', linewidth=2.2, markersize=5, label='周人均课消(全部付费用户)', linestyle='--', markerfacecolor='white') ax.plot(xs, avg_cons, 's-', color=color, linewidth=2.8, markersize=5, label='周有消人均课消', markerfacecolor='white') # 填色区域 ax.fill_between(xs, avg_all, avg_cons, alpha=0.08, color=color) # 标注关键数据点 for i in range(len(xs)): if i % max(1, len(data)//8) == 0: ax.annotate(f'{avg_all[i]:.1f}', (xs[i], avg_all[i]), textcoords='offset points', xytext=(0, -16), fontsize=7.5, color='#999999', ha='center') ax.annotate(f'{avg_cons[i]:.1f}', (xs[i], avg_cons[i]), textcoords='offset points', xytext=(0, 8), fontsize=7.5, color=color, ha='center', fontweight='bold') ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d')) ax.xaxis.set_major_locator(mdates.MonthLocator()) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, fontsize=9) ax.set_ylabel('课消数(节/周)', fontsize=13) ax.set_title(f'{label} 周人均课消趋势(剔除U0序章)', fontsize=16, fontweight='bold') ax.legend(fontsize=12, loc='upper left') ax.grid(True, alpha=0.3) ax.set_xlim(date(2025, 8, 30), date(2026, 5, 12)) plt.tight_layout() path2 = f'{output_dir}/{key}_avg_trend.png' plt.savefig(path2, dpi=150, bbox_inches='tight', facecolor='white') plt.close() print(f' ✅ {path2}') print('\n全部 4 张图表已生成!')