ai_member_xiaoxi/scripts/april_refund_analysis.py
2026-04-15 08:00:01 +08:00

243 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
四月份退费用户学习进度分析 - 生成Excel表格
表头账号ID、购课渠道、购课时间、退费时间、用户类型、
L1U0L1~L2U0L5 各课时的(是否进入、进入时间、是否完成、完成时间)
"""
import psycopg2
import pandas as pd
from datetime import datetime
import os
# 数据库连接
PG_ONLINE = {
'host': 'bj-postgres-16pob4sg.sql.tencentcdb.com',
'port': 28591,
'user': 'ai_member',
'password': os.environ.get('PG_ONLINE_PASSWORD', 'LdfjdjL83h3h3^$&**YGG*'),
'dbname': 'vala_bi'
}
# 课时映射: L1U0L1~L1U0L5 (A1), L2U0L1~L2U0L5 (A2)
CHAPTER_MAP = {
343: 'L1U0L1', 344: 'L1U0L2', 345: 'L1U0L3', 346: 'L1U0L4', 348: 'L1U0L5',
55: 'L2U0L1', 56: 'L2U0L2', 57: 'L2U0L3', 58: 'L2U0L4', 59: 'L2U0L5',
}
CHAPTER_IDS = list(CHAPTER_MAP.keys())
LESSON_ORDER = ['L1U0L1','L1U0L2','L1U0L3','L1U0L4','L1U0L5',
'L2U0L1','L2U0L2','L2U0L3','L2U0L4','L2U0L5']
def get_connection():
return psycopg2.connect(**PG_ONLINE)
def fetch_refund_users(conn):
"""获取四月退费用户及订单信息"""
sql = """
WITH april_refund_accounts AS (
SELECT DISTINCT o.account_id
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.order_status = 4
AND o.updated_at >= '2026-04-01' AND o.updated_at < '2026-05-01'
AND o.created_at >= '2026-04-01' AND o.created_at < '2026-05-01'
),
-- 每个用户所有四月订单
user_all_orders AS (
SELECT o.account_id,
COUNT(DISTINCT o.id) AS total_orders,
COUNT(DISTINCT CASE WHEN o.order_status = 4 THEN o.id END) AS refunded_orders,
COUNT(DISTINCT CASE WHEN o.order_status = 3 THEN o.id END) AS active_orders
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.account_id IN (SELECT account_id FROM april_refund_accounts)
AND o.order_status IN (3, 4)
AND o.created_at >= '2026-04-01' AND o.created_at < '2026-05-01'
GROUP BY o.account_id
),
-- 取最晚购课时间的退费订单
latest_order AS (
SELECT DISTINCT ON (o.account_id)
o.account_id,
o.key_from,
o.sale_channel,
o.created_at AS purchase_time,
o.updated_at AS refund_time
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.account_id IN (SELECT account_id FROM april_refund_accounts)
AND o.order_status = 4
AND o.created_at >= '2026-04-01' AND o.created_at < '2026-05-01'
ORDER BY o.account_id, o.created_at DESC
)
SELECT lo.account_id, lo.key_from, lo.sale_channel, lo.purchase_time, lo.refund_time,
uo.total_orders, uo.refunded_orders, uo.active_orders
FROM latest_order lo
JOIN user_all_orders uo ON lo.account_id = uo.account_id
ORDER BY lo.account_id;
"""
with conn.cursor() as cur:
cur.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
return pd.DataFrame(rows, columns=columns)
def classify_user(row):
if row['total_orders'] == 1:
return '单订单退费'
elif row['active_orders'] > 0:
return '多订单未全退'
else:
return '多订单全退'
def get_channel_name(key_from, sale_channel):
"""直接返回key_from原始值"""
return key_from or ''
def fetch_chapter_records(conn, account_ids):
"""获取用户的课时学习记录"""
# 获取角色ID映射
acct_str = ','.join(str(a) for a in account_ids)
sql_char = f"""
SELECT id AS character_id, account_id FROM bi_vala_app_character
WHERE account_id IN ({acct_str})
"""
with conn.cursor() as cur:
cur.execute(sql_char)
char_rows = cur.fetchall()
if not char_rows:
return {}
char_to_account = {}
char_ids_by_table = {}
for cid, aid in char_rows:
char_to_account[cid] = aid
table_idx = cid % 8
char_ids_by_table.setdefault(table_idx, []).append(cid)
# 从各分表获取课时记录
chapter_ids_str = ','.join(str(c) for c in CHAPTER_IDS)
all_records = []
for table_idx, cids in char_ids_by_table.items():
cids_str = ','.join(str(c) for c in cids)
sql_rec = f"""
SELECT user_id, chapter_id, play_status, created_at
FROM bi_user_chapter_play_record_{table_idx}
WHERE user_id IN ({cids_str})
AND chapter_id IN ({chapter_ids_str})
ORDER BY user_id, chapter_id, created_at
"""
with conn.cursor() as cur:
cur.execute(sql_rec)
records = cur.fetchall()
all_records.extend(records)
# 整理数据: account_id -> chapter_id -> {entered, enter_time, completed, complete_time}
result = {}
for user_id, chapter_id, play_status, created_at in all_records:
account_id = char_to_account.get(user_id)
if not account_id:
continue
if account_id not in result:
result[account_id] = {}
if chapter_id not in result[account_id]:
result[account_id][chapter_id] = {
'entered': True,
'enter_time': created_at,
'completed': False,
'complete_time': None
}
else:
# 更新最早进入时间
if created_at < result[account_id][chapter_id]['enter_time']:
result[account_id][chapter_id]['enter_time'] = created_at
# 完成状态
if play_status == 1:
if not result[account_id][chapter_id]['completed']:
result[account_id][chapter_id]['completed'] = True
result[account_id][chapter_id]['complete_time'] = created_at
else:
# 取最早完成时间
if created_at < result[account_id][chapter_id]['complete_time']:
result[account_id][chapter_id]['complete_time'] = created_at
return result
def format_time(t):
if t is None:
return ''
if isinstance(t, datetime):
return t.strftime('%Y-%m-%d %H:%M:%S')
return str(t)
def main():
conn = get_connection()
print("1. 获取退费用户订单信息...")
df_users = fetch_refund_users(conn)
print(f"{len(df_users)}个退费用户")
# 分类用户
df_users['用户类型'] = df_users.apply(classify_user, axis=1)
df_users['购课渠道'] = df_users.apply(lambda r: get_channel_name(r['key_from'], r['sale_channel']), axis=1)
print("2. 获取课时学习记录...")
account_ids = df_users['account_id'].tolist()
chapter_records = fetch_chapter_records(conn, account_ids)
print(f"{len(chapter_records)}个用户有学习记录")
print("3. 组装表格...")
rows = []
for _, user in df_users.iterrows():
aid = user['account_id']
row = {
'账号ID': aid,
'购课渠道': user['购课渠道'],
'购课时间': format_time(user['purchase_time']),
'退费时间': format_time(user['refund_time']),
'用户类型': user['用户类型'],
}
user_chapters = chapter_records.get(aid, {})
for lesson in LESSON_ORDER:
# 找到对应的chapter_id
ch_id = [k for k, v in CHAPTER_MAP.items() if v == lesson][0]
ch_data = user_chapters.get(ch_id, {})
row[f'{lesson}-是否进入'] = '' if ch_data.get('entered', False) else ''
row[f'{lesson}-进入时间'] = format_time(ch_data.get('enter_time'))
row[f'{lesson}-是否完成'] = '' if ch_data.get('completed', False) else ''
row[f'{lesson}-完成时间'] = format_time(ch_data.get('complete_time'))
rows.append(row)
df_result = pd.DataFrame(rows)
# 导出Excel
output_path = '/tmp/openclaw/四月退费用户学习进度分析.xlsx'
os.makedirs('/tmp/openclaw', exist_ok=True)
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
df_result.to_excel(writer, index=False, sheet_name='退费用户学习进度')
# 调整列宽
ws = writer.sheets['退费用户学习进度']
for col in ws.columns:
max_length = max(len(str(cell.value or '')) for cell in col)
col_letter = col[0].column_letter
ws.column_dimensions[col_letter].width = min(max_length + 2, 25)
print(f"4. 导出完成: {output_path}")
print(f" 总行数: {len(df_result)}")
print(f" 用户类型分布:")
print(df_result['用户类型'].value_counts().to_string())
conn.close()
if __name__ == '__main__':
main()