ai_member_xiaoxi/scripts/channel_lead_refund_analysis.py
2026-05-20 08:00:01 +08:00

346 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
渠道4-5月线索 - 退款相关性分析
分析 Sheet1 退款状态与各维度的相关性:
- 渠道、课包、电话接通
- 用户注册时间、角色创建时间
- U0行课行为L1/L2各课完成情况
"""
import pandas as pd
import numpy as np
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows
import glob, sys
from datetime import datetime
def load_data():
files = glob.glob('/root/.openclaw/media/inbound/*d58af024*')
if not files:
print("未找到输入文件")
sys.exit(1)
input_file = files[0]
print(f"读取: {input_file}")
df1 = pd.read_excel(input_file, sheet_name='Sheet1', dtype=str)
df2 = pd.read_excel(input_file, sheet_name='Sheet2', dtype=str)
return df1, df2
def prepare_data(df1, df2):
"""准备匹配后的分析数据集"""
# Sheet1 清洗
df1['账号ID_clean'] = pd.to_numeric(df1['账号ID'], errors='coerce')
df1['is_refund'] = df1['订单状态'].apply(lambda x: '' if x == '已退款' else '')
# Sheet2 清洗 - 聚合到用户级别
df2['用户ID_num'] = pd.to_numeric(df2['用户ID'], errors='coerce')
# U0 10课的列
u0_cols = ['L1_U00_L01_完成时间', 'L1_U00_L02_完成时间', 'L1_U00_L03_完成时间',
'L1_U00_L04_完成时间', 'L1_U00_L05_完成时间',
'L2_U00_L01_完成时间', 'L2_U00_L02_完成时间', 'L2_U00_L03_完成时间',
'L2_U00_L04_完成时间', 'L2_U00_L05_完成时间']
l1_cols = u0_cols[:5]
l2_cols = u0_cols[5:]
# 按用户聚合 Sheet2
def agg_user(group):
row = {}
row['注册时间'] = group['用户注册时间'].iloc[0] # 同一用户注册时间相同
# 最早角色创建时间
create_times = pd.to_datetime(group['角色创建时间'], errors='coerce').dropna()
row['最早角色创建时间'] = create_times.min() if len(create_times) > 0 else pd.NaT
# U0行课只要任一角色完成就算完成
for c in u0_cols:
if c in group.columns:
vals = group[c].dropna()
vals = vals[vals != '']
vals = vals[vals != 'nan']
row[c] = vals.iloc[0] if len(vals) > 0 else ''
else:
row[c] = ''
# L1完成课数
l1_done = 0
for c in l1_cols:
if c in group.columns:
vals = group[c].dropna()
vals = vals[vals != '']
vals = vals[vals != 'nan']
if len(vals) > 0:
l1_done += 1
row['L1完成课数'] = l1_done
# L2完成课数
l2_done = 0
for c in l2_cols:
if c in group.columns:
vals = group[c].dropna()
vals = vals[vals != '']
vals = vals[vals != 'nan']
if len(vals) > 0:
l2_done += 1
row['L2完成课数'] = l2_done
row['U0总完成课数'] = l1_done + l2_done
# 行课类型
if l1_done > 0 and l2_done > 0:
row['行课类型'] = 'L1+L2都有'
elif l1_done > 0:
row['行课类型'] = '仅L1'
elif l2_done > 0:
row['行课类型'] = '仅L2'
else:
row['行课类型'] = '无行课'
# 是否有任何行课
row['是否有行课'] = '' if (l1_done + l2_done > 0) else ''
return pd.Series(row)
df2_agg = df2.groupby('用户ID_num', as_index=False).apply(agg_user).reset_index()
# 匹配
merged = df1.merge(df2_agg, left_on='账号ID_clean', right_on='用户ID_num', how='left')
merged['匹配状态'] = merged['用户ID_num'].notna().map({True: '已匹配', False: '未匹配'})
return merged
def calc_stats(df, col, refund_col='is_refund'):
"""计算某维度的退款统计"""
total = df.groupby(col).size().reset_index(name='总数')
refund = df[df[refund_col] == ''].groupby(col).size().reset_index(name='退款数')
result = total.merge(refund, on=col, how='left')
result['退款数'] = result['退款数'].fillna(0).astype(int)
result['退费率'] = (result['退款数'] / result['总数'] * 100).round(1)
result['退费率'] = result['退费率'].apply(lambda x: f'{x}%')
result = result.sort_values('总数', ascending=False)
return result
def analyze(df):
"""全面分析"""
results = {}
# 1. 渠道 vs 退款
results['1-渠道'] = calc_stats(df, '渠道')
# 2. 课包 vs 退款
results['2-课包'] = calc_stats(df, '课包')
# 3. 电话接通 vs 退款
results['3-电话接通'] = calc_stats(df, '电话已接通')
# 4. 渠道+课包 交叉 vs 退款
cross = df.groupby(['渠道', '课包']).agg(
总数=('is_refund', 'count'),
退款数=('is_refund', lambda x: (x == '').sum())
).reset_index()
cross['退费率'] = (cross['退款数'] / cross['总数'] * 100).round(1)
cross['退费率'] = cross['退费率'].apply(lambda x: f'{x}%')
results['4-渠道×课包交叉'] = cross.sort_values('总数', ascending=False)
# 5. 电话接通+课包 交叉
cross2 = df.groupby(['电话已接通', '课包']).agg(
总数=('is_refund', 'count'),
退款数=('is_refund', lambda x: (x == '').sum())
).reset_index()
cross2['退费率'] = (cross2['退款数'] / cross2['总数'] * 100).round(1)
cross2['退费率'] = cross2['退费率'].apply(lambda x: f'{x}%')
results['5-电话接通×课包交叉'] = cross2.sort_values('总数', ascending=False)
# 6. 用户注册时间(按月) vs 退款
df['注册月份'] = pd.to_datetime(df['注册时间'], errors='coerce').dt.to_period('M').astype(str)
df['注册月份'] = df['注册月份'].fillna('未知')
results['6-注册月份'] = calc_stats(df, '注册月份')
# 7. 角色创建时间(按月)
df['角色创建月份'] = pd.to_datetime(df['最早角色创建时间'], errors='coerce').dt.to_period('M').astype(str)
df['角色创建月份'] = df['角色创建月份'].fillna('未知')
results['7-角色创建月份'] = calc_stats(df, '角色创建月份')
# 8. 是否有行课 vs 退款
results['8-是否有行课'] = calc_stats(df, '是否有行课')
# 9. 行课类型 vs 退款
results['9-行课类型'] = calc_stats(df, '行课类型')
# 10. U0完成课数 vs 退款
results['10-U0完成课数'] = calc_stats(df, 'U0总完成课数')
# 11. L1完成课数 vs 退款
results['11-L1完成课数'] = calc_stats(df, 'L1完成课数')
# 12. L2完成课数 vs 退款
results['12-L2完成课数'] = calc_stats(df, 'L2完成课数')
# 13. 注册到创建时间差 vs 退款(天)
df['注册时间_dt'] = pd.to_datetime(df['注册时间'], errors='coerce')
df['角色创建时间_dt'] = pd.to_datetime(df['最早角色创建时间'], errors='coerce')
df['注册到创建天数'] = (df['角色创建时间_dt'] - df['注册时间_dt']).dt.total_seconds() / 86400
df['注册到创建天数'] = df['注册到创建天数'].fillna(-1)
def day_bucket(d):
if d < 0:
return '未知'
elif d <= 1:
return '≤1天'
elif d <= 7:
return '2-7天'
elif d <= 30:
return '8-30天'
elif d <= 90:
return '31-90天'
else:
return '>90天'
df['注册到创建间隔'] = df['注册到创建天数'].apply(day_bucket)
results['13-注册到角色创建间隔'] = calc_stats(df, '注册到创建间隔')
# 14. 匹配状态 vs 退款未匹配的在Sheet2中找不到数据
results['14-匹配状态'] = calc_stats(df, '匹配状态')
# 整体
total_count = len(df)
refund_count = (df['is_refund'] == '').sum()
results['_整体'] = {
'总订单数': total_count,
'退款订单数': refund_count,
'退费率': f'{refund_count/total_count*100:.1f}%',
'已匹配数': df['匹配状态'].value_counts().get('已匹配', 0),
'未匹配数': df['匹配状态'].value_counts().get('未匹配', 0),
}
return results, df
def write_excel(results, df, output_path):
"""写入多Sheet Excel"""
wb = Workbook()
# 删除默认Sheet
wb.remove(wb.active)
header_font = Font(bold=True, size=11)
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
header_font_white = Font(bold=True, size=11, color='FFFFFF')
thin_border = Border(
left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin')
)
red_fill = PatternFill(start_color='FFC7CE', end_color='FFC7CE', fill_type='solid')
# 汇总页
ws = wb.create_sheet('汇总')
ws.append(['指标', ''])
for k, v in results['_整体'].items():
ws.append([k, v])
ws.append([])
ws.append(['分析维度摘要'])
# 各个维度的摘要
for sheet_name, stat_df in results.items():
if sheet_name.startswith('_'):
continue
if isinstance(stat_df, pd.DataFrame) and len(stat_df) > 0:
# 找到退费率最高的
if '退费率' in stat_df.columns:
max_rate_row = stat_df.iloc[0]
first_col = stat_df.columns[0]
ws.append([sheet_name, f'{first_col}={max_rate_row[first_col]}: 退费率{max_rate_row["退费率"]} (退款{max_rate_row["退款数"]}/{max_rate_row["总数"]})'])
for col in ws.columns:
col[0].font = header_font
ws.column_dimensions['A'].width = 30
ws.column_dimensions['B'].width = 60
# 各维度Sheet
for sheet_name, stat_df in results.items():
if sheet_name.startswith('_') or not isinstance(stat_df, pd.DataFrame):
continue
ws = wb.create_sheet(sheet_name[:31]) # Excel sheet name max 31 chars
# 写入表头
for col_idx, col_name in enumerate(stat_df.columns, 1):
cell = ws.cell(row=1, column=col_idx, value=col_name)
cell.font = header_font_white
cell.fill = header_fill
cell.border = thin_border
cell.alignment = Alignment(horizontal='center')
# 写入数据
for row_idx, row in enumerate(stat_df.itertuples(index=False), 2):
for col_idx, val in enumerate(row, 1):
cell = ws.cell(row=row_idx, column=col_idx, value=val)
cell.border = thin_border
cell.alignment = Alignment(horizontal='center')
# 高退费率标红 (>整体退费率)
if '退费率' in stat_df.columns and col_idx == len(row):
try:
rate = float(str(val).replace('%', ''))
if rate > 44.0: # 整体退费率约44%
cell.fill = red_fill
cell.font = Font(color='9C0006', bold=True)
except:
pass
# 自动列宽
for col_idx in range(1, len(stat_df.columns) + 1):
max_len = max(
len(str(stat_df.columns[col_idx-1])),
stat_df.iloc[:, col_idx-1].astype(str).str.len().max()
)
ws.column_dimensions[chr(64+col_idx) if col_idx <= 26 else 'A'].width = min(max_len + 4, 30)
# 明细页:所有匹配后的数据
ws_detail = wb.create_sheet('匹配明细')
detail_cols = ['下单日期', '手机号', '渠道', '课包', '订单状态', '电话已接通', '账号ID',
'匹配状态', '注册时间', '最早角色创建时间', '行课类型', 'U0总完成课数',
'L1完成课数', 'L2完成课数', '注册月份', '角色创建月份', '注册到创建间隔']
detail_df = df[detail_cols].copy()
for col_idx, col_name in enumerate(detail_df.columns, 1):
cell = ws_detail.cell(row=1, column=col_idx, value=col_name)
cell.font = header_font_white
cell.fill = header_fill
cell.border = thin_border
cell.alignment = Alignment(horizontal='center')
for row_idx, row in enumerate(detail_df.itertuples(index=False), 2):
for col_idx, val in enumerate(row, 1):
cell = ws_detail.cell(row=row_idx, column=col_idx, value=val if pd.notna(val) else '')
cell.border = thin_border
# 退款行标红背景
if row[4] == '已退款':
cell.fill = PatternFill(start_color='FFF2CC', end_color='FFF2CC', fill_type='solid')
wb.save(output_path)
print(f"报表已保存: {output_path}")
def main():
df1, df2 = load_data()
merged = prepare_data(df1, df2)
print(f"\n整体: {len(merged)}条, 已匹配{(merged['匹配状态']=='已匹配').sum()}, 未匹配{(merged['匹配状态']=='未匹配').sum()}")
print(f"退款: {(merged['is_refund']=='').sum()}, 未退款: {(merged['is_refund']=='').sum()}")
results, merged = analyze(merged)
print(f"\n=== 整体 ===")
for k, v in results['_整体'].items():
print(f" {k}: {v}")
print(f"\n=== 各维度退费率 ===")
for name, stat in results.items():
if name.startswith('_') or not isinstance(stat, pd.DataFrame):
continue
print(f"\n--- {name} ---")
print(stat.to_string(index=False))
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = f'/root/.openclaw/workspace/output/渠道4-5月线索_退款相关性分析_{ts}.xlsx'
write_excel(results, merged, output_path)
print(f"\n输出文件: {output_path}")
return output_path
if __name__ == '__main__':
main()