ai_member_xiaoxi/scripts/course_analysis_v4.py
2026-05-15 08:00:01 +08:00

166 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
v4: L1付费群课消只看L1课程L2付费群课消只看L2课程
"""
import psycopg2
from collections import defaultdict
from datetime import datetime, timedelta, date
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com",
port=28591, user="ai_member",
password="LdfjdjL83h3h3^$&**YGG*", dbname="vala_bi"
)
cur = conn.cursor()
# 获取L1/L2有效章节剔除U0
cur.execute("SELECT id FROM bi_level_unit_lesson WHERE course_level='L1'")
l1_chapters = set(r[0] for r in cur.fetchall())
cur.execute("SELECT id FROM bi_level_unit_lesson WHERE course_level='L2'")
l2_chapters = set(r[0] for r in cur.fetchall())
u0 = {55, 56, 57, 58, 59, 343, 344, 345, 346, 348}
l1_chapters -= u0
l2_chapters -= u0
print(f"L1章节: {len(l1_chapters)} | L2章节: {len(l2_chapters)}")
overall_start = date(2025, 9, 1)
overall_end = date(2026, 5, 11)
weeks = []
d = overall_start
while d < overall_end:
ws = d
we = d + timedelta(days=6 - d.weekday())
if we >= overall_end: we = overall_end - timedelta(days=1)
weeks.append((ws, we))
d = we + timedelta(days=1)
print("分类付费用户...")
cur.execute("""
SELECT o.account_id, o.trade_no, o.order_status, o.pay_success_date,
CASE WHEN o.goods_id IN (57, 60, 63) THEN 'L1'
WHEN o.goods_id = 61 THEN 'L1+L2'
WHEN o.goods_id IN (31, 32, 33, 54) THEN 'L2'
ELSE '其他' END as level_type
FROM bi_vala_order o
INNER JOIN bi_vala_app_account a ON o.account_id = a.id
WHERE a.status = 1 AND a.deleted_at IS NULL AND o.pay_success_date IS NOT NULL
""")
orders = cur.fetchall()
cur.execute("SELECT trade_no FROM bi_refund_order WHERE status = 3")
refund_trades = set(r[0] for r in cur.fetchall())
user_levels = defaultdict(set)
user_orders = defaultdict(list)
for aid, trade_no, order_status, pay_date, lt in orders:
is_refunded = (order_status == 4 and trade_no in refund_trades)
user_levels[aid].add(lt)
user_orders[aid].append((pay_date.date(), is_refunded))
def is_paid(aid, as_of):
return sum(1 for pd, ref in user_orders[aid] if pd <= as_of and not ref) > 0
l1_pool = {aid for aid, lv in user_levels.items() if 'L1' in lv or 'L1+L2' in lv}
l2_pool = {aid for aid, lv in user_levels.items() if 'L2' in lv or 'L1+L2' in lv}
all_pool = l1_pool | l2_pool
print(f"L1池: {len(l1_pool)}, L2池: {len(l2_pool)}, 合计: {len(all_pool)}")
print("查询课消...")
cons_map = {}
for ti in range(8):
tbl = f"bi_user_chapter_play_record_{ti}"
cur.execute(f"""SELECT user_id, chapter_id, updated_at FROM {tbl}
WHERE play_status = 1 AND updated_at >= '2025-09-01' AND updated_at < '2026-05-11'""")
for uid, cid, ua in cur.fetchall():
if cid in u0: continue
# 只保留L1或L2课程
if cid not in l1_chapters and cid not in l2_chapters: continue
key = (uid, cid)
d = ua.date() if hasattr(ua, 'date') else datetime.strptime(str(ua)[:10], '%Y-%m-%d').date()
if key not in cons_map or d < cons_map[key]:
cons_map[key] = d
print("角色映射...")
all_uids = list(set(k[0] for k in cons_map))
char2acct = {}
for i in range(0, len(all_uids), 500):
batch = all_uids[i:i+500]
ph = ','.join(['%s'] * len(batch))
cur.execute(f"SELECT id, account_id FROM bi_vala_app_character WHERE id IN ({ph})", batch)
for cid, aid in cur.fetchall(): char2acct[cid] = aid
print("按周汇总...")
results = []
for ws, we in weeks:
l1_paid = {aid for aid in l1_pool if is_paid(aid, we)}
l2_paid = {aid for aid in l2_pool if is_paid(aid, we)}
t_paid = {aid for aid in all_pool if is_paid(aid, we)}
l1_cons, l1_cu = 0, set()
l2_cons, l2_cu = 0, set()
t_cons, t_cu = 0, set()
for (uid, ch_id), cons_date in cons_map.items():
if not (ws <= cons_date <= we): continue
aid = char2acct.get(uid)
if not aid: continue
# L1付费群 且 是L1课程
if aid in l1_paid and ch_id in l1_chapters:
l1_cons += 1
l1_cu.add(aid)
# L2付费群 且 是L2课程
if aid in l2_paid and ch_id in l2_chapters:
l2_cons += 1
l2_cu.add(aid)
# 合计:付费用户在对应级别课程上的课消
if aid in t_paid:
if (aid in l1_paid and ch_id in l1_chapters) or (aid in l2_paid and ch_id in l2_chapters):
t_cons += 1
t_cu.add(aid)
results.append({
'ws': ws, 'we': we,
'L1_paid': len(l1_paid), 'L1_cons': l1_cons, 'L1_cons_users': len(l1_cu),
'L1_no_cons': len(l1_paid) - len(l1_cu),
'L1_avg_all': round(l1_cons / len(l1_paid), 2) if l1_paid else 0,
'L1_avg_cons': round(l1_cons / len(l1_cu), 2) if l1_cu else 0,
'L2_paid': len(l2_paid), 'L2_cons': l2_cons, 'L2_cons_users': len(l2_cu),
'L2_no_cons': len(l2_paid) - len(l2_cu),
'L2_avg_all': round(l2_cons / len(l2_paid), 2) if l2_paid else 0,
'L2_avg_cons': round(l2_cons / len(l2_cu), 2) if l2_cu else 0,
'total_paid': len(t_paid), 'total_cons': t_cons, 'total_cons_users': len(t_cu),
'total_no_cons': len(t_paid) - len(t_cu),
'total_avg_all': round(t_cons / len(t_paid), 2) if t_paid else 0,
'total_avg_cons': round(t_cons / len(t_cu), 2) if t_cu else 0,
})
r = results[-1]
if (len(results) - 1) % 8 == 0 or len(results) == len(weeks):
print(f" W{len(results):2d} {ws}~{we} | L1:{r['L1_paid']}有消{r['L1_cons_users']} | L2:{r['L2_paid']}有消{r['L2_cons_users']}")
cur.close()
conn.close()
# 打印最终结果
last = results[-1]
print(f"\n=== 最终数据v4L1只看L1课程, L2只看L2课程===")
print(f"L1付费群: {last['L1_paid']}人 | 有消{last['L1_cons_users']} | 无消{last['L1_no_cons']}({last['L1_no_cons']/last['L1_paid']*100:.0f}%) | 人均{last['L1_avg_all']} | 有消人均{last['L1_avg_cons']}")
print(f"L2付费群: {last['L2_paid']}人 | 有消{last['L2_cons_users']} | 无消{last['L2_no_cons']}({last['L2_no_cons']/last['L2_paid']*100:.0f}%) | 人均{last['L2_avg_all']} | 有消人均{last['L2_avg_cons']}")
print(f"合计(去重): {last['total_paid']}人 | 有消{last['total_cons_users']} | 无消{last['total_no_cons']}({last['total_no_cons']/last['total_paid']*100:.0f}%)")
# 保存数据到 JSON 供后续图表脚本使用
import json
out = '/root/.openclaw/workspace/output/course_data_v4.json'
serializable = []
for r in results:
d = {}
for k, v in r.items():
if isinstance(v, date): d[k] = v.isoformat()
else: d[k] = v
serializable.append(d)
with open(out, 'w') as f:
json.dump({'results': serializable, 'L1_chapters': list(l1_chapters), 'L2_chapters': list(l2_chapters)}, f, ensure_ascii=False)
print(f"\n数据已保存: {out}")