#!/usr/bin/env python3 """ 渠道4-5月线索 - 退款相关性分析 分析 Sheet1 退款状态与各维度的相关性: - 渠道、课包、电话接通 - 用户注册时间、角色创建时间 - U0行课行为(L1/L2各课完成情况) """ import pandas as pd import numpy as np from openpyxl import Workbook from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils.dataframe import dataframe_to_rows import glob, sys from datetime import datetime def load_data(): files = glob.glob('/root/.openclaw/media/inbound/*d58af024*') if not files: print("未找到输入文件") sys.exit(1) input_file = files[0] print(f"读取: {input_file}") df1 = pd.read_excel(input_file, sheet_name='Sheet1', dtype=str) df2 = pd.read_excel(input_file, sheet_name='Sheet2', dtype=str) return df1, df2 def prepare_data(df1, df2): """准备匹配后的分析数据集""" # Sheet1 清洗 df1['账号ID_clean'] = pd.to_numeric(df1['账号ID'], errors='coerce') df1['is_refund'] = df1['订单状态'].apply(lambda x: '是' if x == '已退款' else '否') # Sheet2 清洗 - 聚合到用户级别 df2['用户ID_num'] = pd.to_numeric(df2['用户ID'], errors='coerce') # U0 10课的列 u0_cols = ['L1_U00_L01_完成时间', 'L1_U00_L02_完成时间', 'L1_U00_L03_完成时间', 'L1_U00_L04_完成时间', 'L1_U00_L05_完成时间', 'L2_U00_L01_完成时间', 'L2_U00_L02_完成时间', 'L2_U00_L03_完成时间', 'L2_U00_L04_完成时间', 'L2_U00_L05_完成时间'] l1_cols = u0_cols[:5] l2_cols = u0_cols[5:] # 按用户聚合 Sheet2 def agg_user(group): row = {} row['注册时间'] = group['用户注册时间'].iloc[0] # 同一用户注册时间相同 # 最早角色创建时间 create_times = pd.to_datetime(group['角色创建时间'], errors='coerce').dropna() row['最早角色创建时间'] = create_times.min() if len(create_times) > 0 else pd.NaT # U0行课:只要任一角色完成就算完成 for c in u0_cols: if c in group.columns: vals = group[c].dropna() vals = vals[vals != ''] vals = vals[vals != 'nan'] row[c] = vals.iloc[0] if len(vals) > 0 else '' else: row[c] = '' # L1完成课数 l1_done = 0 for c in l1_cols: if c in group.columns: vals = group[c].dropna() vals = vals[vals != ''] vals = vals[vals != 'nan'] if len(vals) > 0: l1_done += 1 row['L1完成课数'] = l1_done # L2完成课数 l2_done = 0 for c in l2_cols: if c in group.columns: vals = group[c].dropna() vals = vals[vals != ''] vals = vals[vals != 'nan'] if len(vals) > 0: l2_done += 1 row['L2完成课数'] = l2_done row['U0总完成课数'] = l1_done + l2_done # 行课类型 if l1_done > 0 and l2_done > 0: row['行课类型'] = 'L1+L2都有' elif l1_done > 0: row['行课类型'] = '仅L1' elif l2_done > 0: row['行课类型'] = '仅L2' else: row['行课类型'] = '无行课' # 是否有任何行课 row['是否有行课'] = '有' if (l1_done + l2_done > 0) else '无' return pd.Series(row) df2_agg = df2.groupby('用户ID_num', as_index=False).apply(agg_user).reset_index() # 匹配 merged = df1.merge(df2_agg, left_on='账号ID_clean', right_on='用户ID_num', how='left') merged['匹配状态'] = merged['用户ID_num'].notna().map({True: '已匹配', False: '未匹配'}) return merged def calc_stats(df, col, refund_col='is_refund'): """计算某维度的退款统计""" total = df.groupby(col).size().reset_index(name='总数') refund = df[df[refund_col] == '是'].groupby(col).size().reset_index(name='退款数') result = total.merge(refund, on=col, how='left') result['退款数'] = result['退款数'].fillna(0).astype(int) result['退费率'] = (result['退款数'] / result['总数'] * 100).round(1) result['退费率'] = result['退费率'].apply(lambda x: f'{x}%') result = result.sort_values('总数', ascending=False) return result def analyze(df): """全面分析""" results = {} # 1. 渠道 vs 退款 results['1-渠道'] = calc_stats(df, '渠道') # 2. 课包 vs 退款 results['2-课包'] = calc_stats(df, '课包') # 3. 电话接通 vs 退款 results['3-电话接通'] = calc_stats(df, '电话已接通') # 4. 渠道+课包 交叉 vs 退款 cross = df.groupby(['渠道', '课包']).agg( 总数=('is_refund', 'count'), 退款数=('is_refund', lambda x: (x == '是').sum()) ).reset_index() cross['退费率'] = (cross['退款数'] / cross['总数'] * 100).round(1) cross['退费率'] = cross['退费率'].apply(lambda x: f'{x}%') results['4-渠道×课包交叉'] = cross.sort_values('总数', ascending=False) # 5. 电话接通+课包 交叉 cross2 = df.groupby(['电话已接通', '课包']).agg( 总数=('is_refund', 'count'), 退款数=('is_refund', lambda x: (x == '是').sum()) ).reset_index() cross2['退费率'] = (cross2['退款数'] / cross2['总数'] * 100).round(1) cross2['退费率'] = cross2['退费率'].apply(lambda x: f'{x}%') results['5-电话接通×课包交叉'] = cross2.sort_values('总数', ascending=False) # 6. 用户注册时间(按月) vs 退款 df['注册月份'] = pd.to_datetime(df['注册时间'], errors='coerce').dt.to_period('M').astype(str) df['注册月份'] = df['注册月份'].fillna('未知') results['6-注册月份'] = calc_stats(df, '注册月份') # 7. 角色创建时间(按月) df['角色创建月份'] = pd.to_datetime(df['最早角色创建时间'], errors='coerce').dt.to_period('M').astype(str) df['角色创建月份'] = df['角色创建月份'].fillna('未知') results['7-角色创建月份'] = calc_stats(df, '角色创建月份') # 8. 是否有行课 vs 退款 results['8-是否有行课'] = calc_stats(df, '是否有行课') # 9. 行课类型 vs 退款 results['9-行课类型'] = calc_stats(df, '行课类型') # 10. U0完成课数 vs 退款 results['10-U0完成课数'] = calc_stats(df, 'U0总完成课数') # 11. L1完成课数 vs 退款 results['11-L1完成课数'] = calc_stats(df, 'L1完成课数') # 12. L2完成课数 vs 退款 results['12-L2完成课数'] = calc_stats(df, 'L2完成课数') # 13. 注册到创建时间差 vs 退款(天) df['注册时间_dt'] = pd.to_datetime(df['注册时间'], errors='coerce') df['角色创建时间_dt'] = pd.to_datetime(df['最早角色创建时间'], errors='coerce') df['注册到创建天数'] = (df['角色创建时间_dt'] - df['注册时间_dt']).dt.total_seconds() / 86400 df['注册到创建天数'] = df['注册到创建天数'].fillna(-1) def day_bucket(d): if d < 0: return '未知' elif d <= 1: return '≤1天' elif d <= 7: return '2-7天' elif d <= 30: return '8-30天' elif d <= 90: return '31-90天' else: return '>90天' df['注册到创建间隔'] = df['注册到创建天数'].apply(day_bucket) results['13-注册到角色创建间隔'] = calc_stats(df, '注册到创建间隔') # 14. 匹配状态 vs 退款(未匹配的在Sheet2中找不到数据) results['14-匹配状态'] = calc_stats(df, '匹配状态') # 整体 total_count = len(df) refund_count = (df['is_refund'] == '是').sum() results['_整体'] = { '总订单数': total_count, '退款订单数': refund_count, '退费率': f'{refund_count/total_count*100:.1f}%', '已匹配数': df['匹配状态'].value_counts().get('已匹配', 0), '未匹配数': df['匹配状态'].value_counts().get('未匹配', 0), } return results, df def write_excel(results, df, output_path): """写入多Sheet Excel""" wb = Workbook() # 删除默认Sheet wb.remove(wb.active) header_font = Font(bold=True, size=11) header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid') header_font_white = Font(bold=True, size=11, color='FFFFFF') thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) red_fill = PatternFill(start_color='FFC7CE', end_color='FFC7CE', fill_type='solid') # 汇总页 ws = wb.create_sheet('汇总') ws.append(['指标', '值']) for k, v in results['_整体'].items(): ws.append([k, v]) ws.append([]) ws.append(['分析维度摘要']) # 各个维度的摘要 for sheet_name, stat_df in results.items(): if sheet_name.startswith('_'): continue if isinstance(stat_df, pd.DataFrame) and len(stat_df) > 0: # 找到退费率最高的 if '退费率' in stat_df.columns: max_rate_row = stat_df.iloc[0] first_col = stat_df.columns[0] ws.append([sheet_name, f'{first_col}={max_rate_row[first_col]}: 退费率{max_rate_row["退费率"]} (退款{max_rate_row["退款数"]}/{max_rate_row["总数"]})']) for col in ws.columns: col[0].font = header_font ws.column_dimensions['A'].width = 30 ws.column_dimensions['B'].width = 60 # 各维度Sheet for sheet_name, stat_df in results.items(): if sheet_name.startswith('_') or not isinstance(stat_df, pd.DataFrame): continue ws = wb.create_sheet(sheet_name[:31]) # Excel sheet name max 31 chars # 写入表头 for col_idx, col_name in enumerate(stat_df.columns, 1): cell = ws.cell(row=1, column=col_idx, value=col_name) cell.font = header_font_white cell.fill = header_fill cell.border = thin_border cell.alignment = Alignment(horizontal='center') # 写入数据 for row_idx, row in enumerate(stat_df.itertuples(index=False), 2): for col_idx, val in enumerate(row, 1): cell = ws.cell(row=row_idx, column=col_idx, value=val) cell.border = thin_border cell.alignment = Alignment(horizontal='center') # 高退费率标红 (>整体退费率) if '退费率' in stat_df.columns and col_idx == len(row): try: rate = float(str(val).replace('%', '')) if rate > 44.0: # 整体退费率约44% cell.fill = red_fill cell.font = Font(color='9C0006', bold=True) except: pass # 自动列宽 for col_idx in range(1, len(stat_df.columns) + 1): max_len = max( len(str(stat_df.columns[col_idx-1])), stat_df.iloc[:, col_idx-1].astype(str).str.len().max() ) ws.column_dimensions[chr(64+col_idx) if col_idx <= 26 else 'A'].width = min(max_len + 4, 30) # 明细页:所有匹配后的数据 ws_detail = wb.create_sheet('匹配明细') detail_cols = ['下单日期', '手机号', '渠道', '课包', '订单状态', '电话已接通', '账号ID', '匹配状态', '注册时间', '最早角色创建时间', '行课类型', 'U0总完成课数', 'L1完成课数', 'L2完成课数', '注册月份', '角色创建月份', '注册到创建间隔'] detail_df = df[detail_cols].copy() for col_idx, col_name in enumerate(detail_df.columns, 1): cell = ws_detail.cell(row=1, column=col_idx, value=col_name) cell.font = header_font_white cell.fill = header_fill cell.border = thin_border cell.alignment = Alignment(horizontal='center') for row_idx, row in enumerate(detail_df.itertuples(index=False), 2): for col_idx, val in enumerate(row, 1): cell = ws_detail.cell(row=row_idx, column=col_idx, value=val if pd.notna(val) else '') cell.border = thin_border # 退款行标红背景 if row[4] == '已退款': cell.fill = PatternFill(start_color='FFF2CC', end_color='FFF2CC', fill_type='solid') wb.save(output_path) print(f"报表已保存: {output_path}") def main(): df1, df2 = load_data() merged = prepare_data(df1, df2) print(f"\n整体: {len(merged)}条, 已匹配{(merged['匹配状态']=='已匹配').sum()}, 未匹配{(merged['匹配状态']=='未匹配').sum()}") print(f"退款: {(merged['is_refund']=='是').sum()}, 未退款: {(merged['is_refund']=='否').sum()}") results, merged = analyze(merged) print(f"\n=== 整体 ===") for k, v in results['_整体'].items(): print(f" {k}: {v}") print(f"\n=== 各维度退费率 ===") for name, stat in results.items(): if name.startswith('_') or not isinstance(stat, pd.DataFrame): continue print(f"\n--- {name} ---") print(stat.to_string(index=False)) ts = datetime.now().strftime('%Y%m%d_%H%M%S') output_path = f'/root/.openclaw/workspace/output/渠道4-5月线索_退款相关性分析_{ts}.xlsx' write_excel(results, merged, output_path) print(f"\n输出文件: {output_path}") return output_path if __name__ == '__main__': main()