#!/usr/bin/env python3 """ 2025年9月至今 三个版本(原始版/纯净版/拟合版)转化率分析 维度:整体 / 分渠道(download_channel) / 区分key_from 按月份分组 """ import psycopg2 import pandas as pd import numpy as np from statsmodels.nonparametric.smoothers_lowess import lowess from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') # ========== 数据库连接 ========== conn = psycopg2.connect( host='bj-postgres-16pob4sg.sql.tencentcdb.com', port=28591, user='ai_member', password='LdfjdjL83h3h3^$&**YGG*', dbname='vala_bi' ) START_DATE = '2025-09-01' END_DATE = '2026-05-13' # 含今天 print("=" * 70) print("Step 1: 提取注册用户数据...") # ========== 提取注册用户 ========== reg_sql = f""" SELECT a.id AS account_id, a.created_at::date AS reg_date, a.download_channel FROM bi_vala_app_account a WHERE a.created_at >= '{START_DATE}' AND a.created_at < '{END_DATE}' AND a.status = 1 AND a.deleted_at IS NULL """ reg_df = pd.read_sql(reg_sql, conn) reg_df['reg_date'] = pd.to_datetime(reg_df['reg_date']) reg_df['reg_month'] = reg_df['reg_date'].dt.to_period('M') print(f" 注册用户数: {len(reg_df)}") print(f" 月份分布: {reg_df['reg_month'].value_counts().sort_index().to_dict()}") # ========== 提取这些用户的订单 ========== account_ids = reg_df['account_id'].unique() print(f"\nStep 2: 提取订单数据... (共 {len(account_ids)} 个账号)") # 分批查询 batch_size = 50000 all_orders = [] for i in range(0, len(account_ids), batch_size): batch = account_ids[i:i+batch_size] ids_str = ','.join([str(x) for x in batch]) order_sql = f""" SELECT o.account_id, o.key_from, o.sale_channel, o.pay_success_date::date AS pay_date, o.order_status, o.pay_amount_int, o.trade_no, o.out_trade_no FROM bi_vala_order o WHERE o.account_id IN ({ids_str}) AND o.pay_success_date IS NOT NULL """ batch_df = pd.read_sql(order_sql, conn) all_orders.append(batch_df) if (i // batch_size + 1) % 5 == 0: print(f" 已处理 {min(i+batch_size, len(account_ids))}/{len(account_ids)} 个账号...") order_df = pd.concat(all_orders, ignore_index=True) if all_orders else pd.DataFrame() print(f" 订单总数: {len(order_df)}") if len(order_df) > 0: print(f" key_from分布: {order_df['key_from'].value_counts().head(10).to_dict()}") # ========== 提取退费信息 ========== print(f"\nStep 3: 提取退费数据...") if len(order_df) > 0: trade_nos = order_df['trade_no'].dropna().unique() out_trade_nos = order_df['out_trade_no'].dropna().unique() all_refunds = [] for i in range(0, len(trade_nos), batch_size): batch = trade_nos[i:i+batch_size] ids_str = ','.join([f"'{x}'" for x in batch]) refund_sql = f""" SELECT trade_no, out_trade_no, status FROM bi_refund_order WHERE trade_no IN ({ids_str}) AND status = 3 """ try: batch_df = pd.read_sql(refund_sql, conn) all_refunds.append(batch_df) except: pass refund_df = pd.concat(all_refunds, ignore_index=True) if all_refunds else pd.DataFrame() print(f" 退费记录数: {len(refund_df)}") else: refund_df = pd.DataFrame() conn.close() # ========== 数据预处理 ========== print(f"\nStep 4: 数据预处理...") # 标记端内/端外 INTERNAL_KF = ['app-active-h5-0-0', 'app-sales-bj-qhm-0'] if len(order_df) > 0: order_df['is_internal'] = order_df['key_from'].isin(INTERNAL_KF) # 已完成订单 (status 3 or 4) order_df['is_completed'] = order_df['order_status'].isin([3, 4]) # 退费标记 if len(refund_df) > 0: refund_trade = set(refund_df['trade_no'].dropna()) order_df['is_refunded'] = order_df['trade_no'].isin(refund_trade) else: order_df['is_refunded'] = False # 端内有效付费订单(已完成,含退费) internal_orders = order_df[order_df['is_internal'] & order_df['is_completed']] # 端外有效付费订单(已完成) external_orders = order_df[~order_df['is_internal'] & (order_df['order_status'] == 3)] # 按用户汇总 # 端内付费用户 internal_paid_users = set(internal_orders['account_id'].unique()) # 端外付费用户 external_paid_users = set(external_orders['account_id'].unique()) # 端内付费用户(剔除全部退费) # 统计每个用户的端内订单退费情况 user_internal_orders = internal_orders.groupby('account_id').agg( total_orders=('trade_no', 'count'), refunded_orders=('is_refunded', 'sum') ).reset_index() user_internal_orders['all_refunded'] = user_internal_orders['total_orders'] == user_internal_orders['refunded_orders'] internal_paid_users_norefund = set(user_internal_orders[~user_internal_orders['all_refunded']]['account_id']) print(f" 端内付费用户数: {len(internal_paid_users)}") print(f" 端内付费用户数(剔除全部退费): {len(internal_paid_users_norefund)}") print(f" 端外付费用户数: {len(external_paid_users)}") else: internal_paid_users = set() internal_paid_users_norefund = set() external_paid_users = set() # 给注册用户打标签 reg_df['has_internal'] = reg_df['account_id'].isin(internal_paid_users) reg_df['has_internal_norefund'] = reg_df['account_id'].isin(internal_paid_users_norefund) reg_df['has_external'] = reg_df['account_id'].isin(external_paid_users) reg_df['has_no_order'] = ~reg_df['has_internal'] & ~reg_df['has_external'] # 纯净版:剔除"只有端外订单且没有端内订单"的用户 reg_df['is_clean_user'] = ~( reg_df['has_external'] & ~reg_df['has_internal'] & ~reg_df['has_no_order'] ) # 简化:剔除只有端外订单的用户 reg_df['only_external'] = reg_df['has_external'] & ~reg_df['has_internal'] & ~reg_df['has_no_order'] # ========== LOESS 拟合 ========== print(f"\nStep 5: LOESS拟合每日注册人数(含活动日历)...") # ===== 活动日历 ===== # 2025年:9/9-10, 9/19-23, 10/13-14, 10/16-17, 11/2, 11/7, 11/10, 11/12, 11/19, 12/3 # 2026年:1/28(余波1天), 2/11, 2/26(余波4天), 3/5(余波3天), 3/9, 3/12-13, # 4/3(余波4天), 4/8(余波2天), 4/22(余波1天), 4/28, 5/6-7 activity_ranges = [ ('2025-09-09', '2025-09-10'), ('2025-09-19', '2025-09-23'), ('2025-10-13', '2025-10-14'), ('2025-10-16', '2025-10-17'), ('2025-11-02', '2025-11-02'), ('2025-11-07', '2025-11-07'), ('2025-11-10', '2025-11-10'), ('2025-11-12', '2025-11-12'), ('2025-11-19', '2025-11-19'), ('2025-12-03', '2025-12-03'), # 2026 ('2026-01-28', '2026-01-29'), # 余波1天 ('2026-02-11', '2026-02-11'), ('2026-02-26', '2026-03-02'), # 余波4天 ('2026-03-05', '2026-03-08'), # 余波3天 ('2026-03-09', '2026-03-09'), ('2026-03-12', '2026-03-13'), ('2026-04-03', '2026-04-07'), # 余波4天 ('2026-04-08', '2026-04-10'), # 余波2天 ('2026-04-22', '2026-04-23'), # 余波1天 ('2026-04-28', '2026-04-28'), ('2026-05-06', '2026-05-07'), ] activity_dates = set() for start_s, end_s in activity_ranges: s = pd.Timestamp(start_s) e = pd.Timestamp(end_s) for d in pd.date_range(s, e): activity_dates.add(d.date()) print(f" 活动+余波天数: {len(activity_dates)}") daily_reg = reg_df.groupby('reg_date')['account_id'].count().reset_index() daily_reg.columns = ['reg_date', 'reg_count'] daily_reg = daily_reg.sort_values('reg_date') daily_reg['is_activity'] = daily_reg['reg_date'].apply(lambda d: d.date() in activity_dates) # 星期因子(基于清洁日计算) daily_reg['weekday'] = daily_reg['reg_date'].dt.dayofweek # 0=Mon daily_reg['is_weekend'] = daily_reg['weekday'] >= 5 # 计算day_num(LOESS拟合用) start_dt = pd.Timestamp(START_DATE) daily_reg['day_num'] = (daily_reg['reg_date'] - start_dt).dt.days # 清洁日 = 非活动+非余波日 clean_days = daily_reg[~daily_reg['is_activity']] if len(clean_days) > 0: overall_avg = clean_days['reg_count'].mean() weekday_avg = clean_days.groupby('weekday')['reg_count'].mean() daily_reg['weekday_factor'] = daily_reg['weekday'].map( weekday_avg / overall_avg ).fillna(1.0) else: daily_reg['weekday_factor'] = 1.0 # LOESS拟合(仅用清洁日) if len(clean_days) >= 5: frac_val = min(0.3, 60.0 / len(daily_reg)) frac_val = max(frac_val, 0.1) loess_result = lowess( clean_days['reg_count'].values, clean_days['day_num'].values, frac=frac_val, it=3 ) # 将LOESS结果映射回所有天(用最近邻插值) loess_days = clean_days['day_num'].values loess_vals = loess_result[:, 1] # 对所有天用线性插值获取LOESS基线 all_loess = np.interp(daily_reg['day_num'].values, loess_days, loess_vals) daily_reg['loess_baseline'] = all_loess else: daily_reg['loess_baseline'] = daily_reg['reg_count'] # 星期修正后的拟合值 daily_reg['corrected_fitted'] = daily_reg['loess_baseline'] * daily_reg['weekday_factor'] # 应用规则:活动日→拟合值;非活动日→保底规则 max(实际, 拟合) daily_reg['fitted_reg'] = np.where( daily_reg['is_activity'], daily_reg['corrected_fitted'], # 活动日:用拟合值替换 daily_reg['reg_count'] # 非活动日:保留实际值 ) # 保底规则:拟合值不能低于0 # daily_reg['fitted_reg'] = np.maximum(daily_reg['fitted_reg'], 0) act_count = daily_reg['is_activity'].sum() print(f" 清洁日: {len(clean_days)}, 活动/余波日: {act_count}") print(f" LOESS frac: {frac_val:.4f}") print(f" 星期因子范围: {daily_reg['weekday_factor'].min():.2f} ~ {daily_reg['weekday_factor'].max():.2f}") # 打印拟合对比 print(f" {'月份':<8} {'原始':>6} {'拟合':>6} {'剔除':>6} {'剔除率':>7}") from collections import defaultdict month_fit = defaultdict(lambda: {'orig': 0, 'fit': 0}) for _, row in daily_reg.iterrows(): m = str(row['reg_date'].to_period('M')) month_fit[m]['orig'] += row['reg_count'] month_fit[m]['fit'] += row['fitted_reg'] for m in sorted(month_fit.keys()): o = month_fit[m]['orig'] f = month_fit[m]['fit'] d = o - f r = d/o*100 if o > 0 else 0 print(f" {m:<8} {int(o):>6} {int(f):>6} {int(d):>6} {r:>6.1f}%") # 月度汇总拟合值 daily_reg['reg_month'] = daily_reg['reg_date'].dt.to_period('M') monthly_fitted = daily_reg.groupby('reg_month')['fitted_reg'].sum().reset_index() monthly_fitted.columns = ['reg_month', 'fitted_total'] # ========== 计算转化率 ========== print(f"\nStep 6: 计算各版本各维度转化率...") results = [] # 月份列表 months = sorted(reg_df['reg_month'].unique()) for month in months: month_users = reg_df[reg_df['reg_month'] == month] # ---- 原始版 ---- denom_orig = len(month_users) num_orig = month_users['has_internal_norefund'].sum() # ---- 纯净版 ---- clean_users = month_users[~month_users['only_external']] denom_clean = len(clean_users) num_clean = clean_users['has_internal_norefund'].sum() # ---- 拟合版 ---- fitted_row = monthly_fitted[monthly_fitted['reg_month'] == month] denom_fitted = fitted_row['fitted_total'].values[0] if len(fitted_row) > 0 else denom_orig num_fitted = month_users['has_internal_norefund'].sum() # 分子不变 # 整体维度 results.append({ '月份': str(month), '维度': '整体', '渠道': '全部', '版本': '原始版', '注册用户数': denom_orig, '付费用户数': num_orig, '转化率': f"{num_orig/denom_orig*100:.2f}%" if denom_orig > 0 else '0%' }) results.append({ '月份': str(month), '维度': '整体', '渠道': '全部', '版本': '纯净版', '注册用户数': denom_clean if isinstance(denom_clean, (int, np.integer)) else int(denom_clean), '付费用户数': num_clean, '转化率': f"{num_clean/denom_clean*100:.2f}%" if denom_clean > 0 else '0%' }) results.append({ '月份': str(month), '维度': '整体', '渠道': '全部', '版本': '拟合版', '注册用户数': int(round(denom_fitted)), '付费用户数': num_fitted, '转化率': f"{num_fitted/denom_fitted*100:.2f}%" if denom_fitted > 0 else '0%' }) # ---- 分渠道(download_channel) ---- # 计算拟合版缩放比例 fitted_ratio = denom_fitted / denom_orig if denom_orig > 0 else 1.0 for ch, ch_users in month_users.groupby('download_channel'): ch_label = ch if ch else '未知' d_orig = len(ch_users) n_orig = ch_users['has_internal_norefund'].sum() # 纯净版 ch_clean = ch_users[~ch_users['only_external']] d_clean = len(ch_clean) n_clean = ch_clean['has_internal_norefund'].sum() # 拟合版(按整体拟合比例缩放) d_fitted = int(round(d_orig * fitted_ratio)) results.append({ '月份': str(month), '维度': 'download_channel', '渠道': ch_label, '版本': '原始版', '注册用户数': d_orig, '付费用户数': n_orig, '转化率': f"{n_orig/d_orig*100:.2f}%" if d_orig > 0 else '0%' }) results.append({ '月份': str(month), '维度': 'download_channel', '渠道': ch_label, '版本': '纯净版', '注册用户数': d_clean, '付费用户数': n_clean, '转化率': f"{n_clean/d_clean*100:.2f}%" if d_clean > 0 else '0%' }) results.append({ '月份': str(month), '维度': 'download_channel', '渠道': ch_label, '版本': '拟合版', '注册用户数': d_fitted, '付费用户数': n_orig, '转化率': f"{n_orig/d_fitted*100:.2f}%" if d_fitted > 0 else '0%' }) print(f" 整体维度结果: {len([r for r in results if r['维度']=='整体'])} 条") print(f" download_channel维度结果: {len([r for r in results if r['维度']=='download_channel'])} 条") # ---- 区分 key_from ---- # 按注册月份 × key_from 统计付费用户 if len(order_df) > 0: # 端内已完成的订单 internal_completed = order_df[order_df['is_internal'] & order_df['is_completed']] # 关联用户注册月份 user_month_map = reg_df[['account_id', 'reg_month']].drop_duplicates('account_id') internal_completed = internal_completed.merge(user_month_map, on='account_id', how='inner') # 剔除全部退费的用户 internal_completed['is_refunded_order'] = internal_completed['is_refunded'] user_refund_stat = internal_completed.groupby('account_id').agg( total=('trade_no', 'count'), refunded=('is_refunded_order', 'sum') ).reset_index() user_refund_stat['all_refunded'] = user_refund_stat['total'] == user_refund_stat['refunded'] all_refunded_users = set(user_refund_stat[user_refund_stat['all_refunded']]['account_id']) internal_completed_norefund = internal_completed[~internal_completed['account_id'].isin(all_refunded_users)] # 按 reg_month × key_from 统计付费用户数 kf_stats = internal_completed_norefund.groupby(['reg_month', 'key_from'])['account_id'].nunique().reset_index() kf_stats.columns = ['reg_month', 'key_from', 'paid_users'] # 各月份注册人数(原始+纯净+拟合) month_denom = reg_df.groupby('reg_month')['account_id'].nunique().reset_index() month_denom.columns = ['reg_month', 'total_users'] # 纯净版每月分母 clean_reg = reg_df[~reg_df['only_external']] month_clean = clean_reg.groupby('reg_month')['account_id'].nunique().reset_index() month_clean.columns = ['reg_month', 'clean_users'] month_denom = month_denom.merge(month_clean, on='reg_month', how='left') month_denom['clean_users'] = month_denom['clean_users'].fillna(0).astype(int) # 拟合版每月分母 month_denom = month_denom.merge(monthly_fitted, on='reg_month', how='left') month_denom['fitted_total'] = month_denom['fitted_total'].fillna(month_denom['total_users']).round(0).astype(int) kf_stats = kf_stats.merge(month_denom, on='reg_month', how='left') for _, row in kf_stats.iterrows(): denom_orig = int(row['total_users']) paid = int(row['paid_users']) denom_clean = int(row['clean_users']) denom_fitted = int(row['fitted_total']) results.append({ '月份': str(row['reg_month']), '维度': 'key_from', '渠道': row['key_from'], '版本': '原始版', '注册用户数': denom_orig, '付费用户数': paid, '转化率': f"{paid/denom_orig*100:.2f}%" if denom_orig > 0 else '0%' }) results.append({ '月份': str(row['reg_month']), '维度': 'key_from', '渠道': row['key_from'], '版本': '纯净版', '注册用户数': denom_clean, '付费用户数': paid, '转化率': f"{paid/denom_clean*100:.2f}%" if denom_clean > 0 else '0%' }) results.append({ '月份': str(row['reg_month']), '维度': 'key_from', '渠道': row['key_from'], '版本': '拟合版', '注册用户数': denom_fitted, '付费用户数': paid, '转化率': f"{paid/denom_fitted*100:.2f}%" if denom_fitted > 0 else '0%' }) # 端外汇总 external_completed = order_df[(~order_df['is_internal']) & (order_df['order_status'] == 3)] external_completed = external_completed.merge(user_month_map, on='account_id', how='inner') ext_stats = external_completed.groupby('reg_month')['account_id'].nunique().reset_index() ext_stats.columns = ['reg_month', 'paid_users'] ext_stats = ext_stats.merge(month_denom, on='reg_month', how='left') for _, row in ext_stats.iterrows(): denom_orig = int(row['total_users']) paid = int(row['paid_users']) denom_clean = int(row['clean_users']) denom_fitted = int(row['fitted_total']) results.append({ '月份': str(row['reg_month']), '维度': 'key_from', '渠道': '端外合计', '版本': '原始版', '注册用户数': denom_orig, '付费用户数': paid, '转化率': f"{paid/denom_orig*100:.2f}%" if denom_orig > 0 else '0%' }) results.append({ '月份': str(row['reg_month']), '维度': 'key_from', '渠道': '端外合计', '版本': '纯净版', '注册用户数': denom_clean, '付费用户数': paid, '转化率': f"{paid/denom_clean*100:.2f}%" if denom_clean > 0 else '0%' }) results.append({ '月份': str(row['reg_month']), '维度': 'key_from', '渠道': '端外合计', '版本': '拟合版', '注册用户数': denom_fitted, '付费用户数': paid, '转化率': f"{paid/denom_fitted*100:.2f}%" if denom_fitted > 0 else '0%' }) print(f" key_from维度结果: {len([r for r in results if r['维度']=='key_from'])} 条") # ========== 输出到Excel ========== result_df = pd.DataFrame(results) print(f"\n总结果数: {len(result_df)}") # 按维度排序 dim_order = {'整体': 0, 'download_channel': 1, 'key_from': 2} result_df['dim_sort'] = result_df['维度'].map(dim_order) result_df = result_df.sort_values(['月份', 'dim_sort', '版本', '渠道']).drop(columns=['dim_sort']) output_path = '/root/.openclaw/workspace/output/conversion_rate_202509_202605.xlsx' with pd.ExcelWriter(output_path, engine='openpyxl') as writer: # Sheet1: 全部数据 result_df.to_excel(writer, sheet_name='全部数据', index=False) # 各维度分sheet for dim in ['整体', 'download_channel', 'key_from']: dim_df = result_df[result_df['维度'] == dim].copy() dim_df = dim_df.drop(columns=['维度']) sheet_name = dim[:31] # Excel sheet name limit dim_df.to_excel(writer, sheet_name=sheet_name, index=False) # 透视表:整体(原始版 vs 纯净版 vs 拟合版) overall_df = result_df[result_df['维度'] == '整体'].copy() pivot = overall_df.pivot_table( index='月份', columns='版本', values='转化率', aggfunc='first' ) pivot.to_excel(writer, sheet_name='整体对比') print(f"\n输出文件: {output_path}") print("Done!")