ai_member_xiaoxi/scripts/conversion_rate_analysis.py
2026-05-13 08:00:01 +08:00

547 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
2025年9月至今 三个版本(原始版/纯净版/拟合版)转化率分析
维度:整体 / 分渠道(download_channel) / 区分key_from
按月份分组
"""
import psycopg2
import pandas as pd
import numpy as np
from statsmodels.nonparametric.smoothers_lowess import lowess
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')
# ========== 数据库连接 ==========
conn = psycopg2.connect(
host='bj-postgres-16pob4sg.sql.tencentcdb.com',
port=28591,
user='ai_member',
password='LdfjdjL83h3h3^$&**YGG*',
dbname='vala_bi'
)
START_DATE = '2025-09-01'
END_DATE = '2026-05-13' # 含今天
print("=" * 70)
print("Step 1: 提取注册用户数据...")
# ========== 提取注册用户 ==========
reg_sql = f"""
SELECT
a.id AS account_id,
a.created_at::date AS reg_date,
a.download_channel
FROM bi_vala_app_account a
WHERE a.created_at >= '{START_DATE}'
AND a.created_at < '{END_DATE}'
AND a.status = 1
AND a.deleted_at IS NULL
"""
reg_df = pd.read_sql(reg_sql, conn)
reg_df['reg_date'] = pd.to_datetime(reg_df['reg_date'])
reg_df['reg_month'] = reg_df['reg_date'].dt.to_period('M')
print(f" 注册用户数: {len(reg_df)}")
print(f" 月份分布: {reg_df['reg_month'].value_counts().sort_index().to_dict()}")
# ========== 提取这些用户的订单 ==========
account_ids = reg_df['account_id'].unique()
print(f"\nStep 2: 提取订单数据... (共 {len(account_ids)} 个账号)")
# 分批查询
batch_size = 50000
all_orders = []
for i in range(0, len(account_ids), batch_size):
batch = account_ids[i:i+batch_size]
ids_str = ','.join([str(x) for x in batch])
order_sql = f"""
SELECT
o.account_id,
o.key_from,
o.sale_channel,
o.pay_success_date::date AS pay_date,
o.order_status,
o.pay_amount_int,
o.trade_no,
o.out_trade_no
FROM bi_vala_order o
WHERE o.account_id IN ({ids_str})
AND o.pay_success_date IS NOT NULL
"""
batch_df = pd.read_sql(order_sql, conn)
all_orders.append(batch_df)
if (i // batch_size + 1) % 5 == 0:
print(f" 已处理 {min(i+batch_size, len(account_ids))}/{len(account_ids)} 个账号...")
order_df = pd.concat(all_orders, ignore_index=True) if all_orders else pd.DataFrame()
print(f" 订单总数: {len(order_df)}")
if len(order_df) > 0:
print(f" key_from分布: {order_df['key_from'].value_counts().head(10).to_dict()}")
# ========== 提取退费信息 ==========
print(f"\nStep 3: 提取退费数据...")
if len(order_df) > 0:
trade_nos = order_df['trade_no'].dropna().unique()
out_trade_nos = order_df['out_trade_no'].dropna().unique()
all_refunds = []
for i in range(0, len(trade_nos), batch_size):
batch = trade_nos[i:i+batch_size]
ids_str = ','.join([f"'{x}'" for x in batch])
refund_sql = f"""
SELECT trade_no, out_trade_no, status
FROM bi_refund_order
WHERE trade_no IN ({ids_str}) AND status = 3
"""
try:
batch_df = pd.read_sql(refund_sql, conn)
all_refunds.append(batch_df)
except:
pass
refund_df = pd.concat(all_refunds, ignore_index=True) if all_refunds else pd.DataFrame()
print(f" 退费记录数: {len(refund_df)}")
else:
refund_df = pd.DataFrame()
conn.close()
# ========== 数据预处理 ==========
print(f"\nStep 4: 数据预处理...")
# 标记端内/端外
INTERNAL_KF = ['app-active-h5-0-0', 'app-sales-bj-qhm-0']
if len(order_df) > 0:
order_df['is_internal'] = order_df['key_from'].isin(INTERNAL_KF)
# 已完成订单 (status 3 or 4)
order_df['is_completed'] = order_df['order_status'].isin([3, 4])
# 退费标记
if len(refund_df) > 0:
refund_trade = set(refund_df['trade_no'].dropna())
order_df['is_refunded'] = order_df['trade_no'].isin(refund_trade)
else:
order_df['is_refunded'] = False
# 端内有效付费订单(已完成,含退费)
internal_orders = order_df[order_df['is_internal'] & order_df['is_completed']]
# 端外有效付费订单(已完成)
external_orders = order_df[~order_df['is_internal'] & (order_df['order_status'] == 3)]
# 按用户汇总
# 端内付费用户
internal_paid_users = set(internal_orders['account_id'].unique())
# 端外付费用户
external_paid_users = set(external_orders['account_id'].unique())
# 端内付费用户(剔除全部退费)
# 统计每个用户的端内订单退费情况
user_internal_orders = internal_orders.groupby('account_id').agg(
total_orders=('trade_no', 'count'),
refunded_orders=('is_refunded', 'sum')
).reset_index()
user_internal_orders['all_refunded'] = user_internal_orders['total_orders'] == user_internal_orders['refunded_orders']
internal_paid_users_norefund = set(user_internal_orders[~user_internal_orders['all_refunded']]['account_id'])
print(f" 端内付费用户数: {len(internal_paid_users)}")
print(f" 端内付费用户数(剔除全部退费): {len(internal_paid_users_norefund)}")
print(f" 端外付费用户数: {len(external_paid_users)}")
else:
internal_paid_users = set()
internal_paid_users_norefund = set()
external_paid_users = set()
# 给注册用户打标签
reg_df['has_internal'] = reg_df['account_id'].isin(internal_paid_users)
reg_df['has_internal_norefund'] = reg_df['account_id'].isin(internal_paid_users_norefund)
reg_df['has_external'] = reg_df['account_id'].isin(external_paid_users)
reg_df['has_no_order'] = ~reg_df['has_internal'] & ~reg_df['has_external']
# 纯净版:剔除"只有端外订单且没有端内订单"的用户
reg_df['is_clean_user'] = ~(
reg_df['has_external'] & ~reg_df['has_internal'] & ~reg_df['has_no_order']
)
# 简化:剔除只有端外订单的用户
reg_df['only_external'] = reg_df['has_external'] & ~reg_df['has_internal'] & ~reg_df['has_no_order']
# ========== LOESS 拟合 ==========
print(f"\nStep 5: LOESS拟合每日注册人数含活动日历...")
# ===== 活动日历 =====
# 2025年9/9-10, 9/19-23, 10/13-14, 10/16-17, 11/2, 11/7, 11/10, 11/12, 11/19, 12/3
# 2026年1/28(余波1天), 2/11, 2/26(余波4天), 3/5(余波3天), 3/9, 3/12-13,
# 4/3(余波4天), 4/8(余波2天), 4/22(余波1天), 4/28, 5/6-7
activity_ranges = [
('2025-09-09', '2025-09-10'),
('2025-09-19', '2025-09-23'),
('2025-10-13', '2025-10-14'),
('2025-10-16', '2025-10-17'),
('2025-11-02', '2025-11-02'),
('2025-11-07', '2025-11-07'),
('2025-11-10', '2025-11-10'),
('2025-11-12', '2025-11-12'),
('2025-11-19', '2025-11-19'),
('2025-12-03', '2025-12-03'),
# 2026
('2026-01-28', '2026-01-29'), # 余波1天
('2026-02-11', '2026-02-11'),
('2026-02-26', '2026-03-02'), # 余波4天
('2026-03-05', '2026-03-08'), # 余波3天
('2026-03-09', '2026-03-09'),
('2026-03-12', '2026-03-13'),
('2026-04-03', '2026-04-07'), # 余波4天
('2026-04-08', '2026-04-10'), # 余波2天
('2026-04-22', '2026-04-23'), # 余波1天
('2026-04-28', '2026-04-28'),
('2026-05-06', '2026-05-07'),
]
activity_dates = set()
for start_s, end_s in activity_ranges:
s = pd.Timestamp(start_s)
e = pd.Timestamp(end_s)
for d in pd.date_range(s, e):
activity_dates.add(d.date())
print(f" 活动+余波天数: {len(activity_dates)}")
daily_reg = reg_df.groupby('reg_date')['account_id'].count().reset_index()
daily_reg.columns = ['reg_date', 'reg_count']
daily_reg = daily_reg.sort_values('reg_date')
daily_reg['is_activity'] = daily_reg['reg_date'].apply(lambda d: d.date() in activity_dates)
# 星期因子(基于清洁日计算)
daily_reg['weekday'] = daily_reg['reg_date'].dt.dayofweek # 0=Mon
daily_reg['is_weekend'] = daily_reg['weekday'] >= 5
# 计算day_numLOESS拟合用
start_dt = pd.Timestamp(START_DATE)
daily_reg['day_num'] = (daily_reg['reg_date'] - start_dt).dt.days
# 清洁日 = 非活动+非余波日
clean_days = daily_reg[~daily_reg['is_activity']]
if len(clean_days) > 0:
overall_avg = clean_days['reg_count'].mean()
weekday_avg = clean_days.groupby('weekday')['reg_count'].mean()
daily_reg['weekday_factor'] = daily_reg['weekday'].map(
weekday_avg / overall_avg
).fillna(1.0)
else:
daily_reg['weekday_factor'] = 1.0
# LOESS拟合仅用清洁日
if len(clean_days) >= 5:
frac_val = min(0.3, 60.0 / len(daily_reg))
frac_val = max(frac_val, 0.1)
loess_result = lowess(
clean_days['reg_count'].values,
clean_days['day_num'].values,
frac=frac_val,
it=3
)
# 将LOESS结果映射回所有天用最近邻插值
loess_days = clean_days['day_num'].values
loess_vals = loess_result[:, 1]
# 对所有天用线性插值获取LOESS基线
all_loess = np.interp(daily_reg['day_num'].values, loess_days, loess_vals)
daily_reg['loess_baseline'] = all_loess
else:
daily_reg['loess_baseline'] = daily_reg['reg_count']
# 星期修正后的拟合值
daily_reg['corrected_fitted'] = daily_reg['loess_baseline'] * daily_reg['weekday_factor']
# 应用规则:活动日→拟合值;非活动日→保底规则 max(实际, 拟合)
daily_reg['fitted_reg'] = np.where(
daily_reg['is_activity'],
daily_reg['corrected_fitted'], # 活动日:用拟合值替换
daily_reg['reg_count'] # 非活动日:保留实际值
)
# 保底规则拟合值不能低于0
# daily_reg['fitted_reg'] = np.maximum(daily_reg['fitted_reg'], 0)
act_count = daily_reg['is_activity'].sum()
print(f" 清洁日: {len(clean_days)}, 活动/余波日: {act_count}")
print(f" LOESS frac: {frac_val:.4f}")
print(f" 星期因子范围: {daily_reg['weekday_factor'].min():.2f} ~ {daily_reg['weekday_factor'].max():.2f}")
# 打印拟合对比
print(f" {'月份':<8} {'原始':>6} {'拟合':>6} {'剔除':>6} {'剔除率':>7}")
from collections import defaultdict
month_fit = defaultdict(lambda: {'orig': 0, 'fit': 0})
for _, row in daily_reg.iterrows():
m = str(row['reg_date'].to_period('M'))
month_fit[m]['orig'] += row['reg_count']
month_fit[m]['fit'] += row['fitted_reg']
for m in sorted(month_fit.keys()):
o = month_fit[m]['orig']
f = month_fit[m]['fit']
d = o - f
r = d/o*100 if o > 0 else 0
print(f" {m:<8} {int(o):>6} {int(f):>6} {int(d):>6} {r:>6.1f}%")
# 月度汇总拟合值
daily_reg['reg_month'] = daily_reg['reg_date'].dt.to_period('M')
monthly_fitted = daily_reg.groupby('reg_month')['fitted_reg'].sum().reset_index()
monthly_fitted.columns = ['reg_month', 'fitted_total']
# ========== 计算转化率 ==========
print(f"\nStep 6: 计算各版本各维度转化率...")
results = []
# 月份列表
months = sorted(reg_df['reg_month'].unique())
for month in months:
month_users = reg_df[reg_df['reg_month'] == month]
# ---- 原始版 ----
denom_orig = len(month_users)
num_orig = month_users['has_internal_norefund'].sum()
# ---- 纯净版 ----
clean_users = month_users[~month_users['only_external']]
denom_clean = len(clean_users)
num_clean = clean_users['has_internal_norefund'].sum()
# ---- 拟合版 ----
fitted_row = monthly_fitted[monthly_fitted['reg_month'] == month]
denom_fitted = fitted_row['fitted_total'].values[0] if len(fitted_row) > 0 else denom_orig
num_fitted = month_users['has_internal_norefund'].sum() # 分子不变
# 整体维度
results.append({
'月份': str(month),
'维度': '整体',
'渠道': '全部',
'版本': '原始版',
'注册用户数': denom_orig,
'付费用户数': num_orig,
'转化率': f"{num_orig/denom_orig*100:.2f}%" if denom_orig > 0 else '0%'
})
results.append({
'月份': str(month),
'维度': '整体',
'渠道': '全部',
'版本': '纯净版',
'注册用户数': denom_clean if isinstance(denom_clean, (int, np.integer)) else int(denom_clean),
'付费用户数': num_clean,
'转化率': f"{num_clean/denom_clean*100:.2f}%" if denom_clean > 0 else '0%'
})
results.append({
'月份': str(month),
'维度': '整体',
'渠道': '全部',
'版本': '拟合版',
'注册用户数': int(round(denom_fitted)),
'付费用户数': num_fitted,
'转化率': f"{num_fitted/denom_fitted*100:.2f}%" if denom_fitted > 0 else '0%'
})
# ---- 分渠道(download_channel) ----
# 计算拟合版缩放比例
fitted_ratio = denom_fitted / denom_orig if denom_orig > 0 else 1.0
for ch, ch_users in month_users.groupby('download_channel'):
ch_label = ch if ch else '未知'
d_orig = len(ch_users)
n_orig = ch_users['has_internal_norefund'].sum()
# 纯净版
ch_clean = ch_users[~ch_users['only_external']]
d_clean = len(ch_clean)
n_clean = ch_clean['has_internal_norefund'].sum()
# 拟合版(按整体拟合比例缩放)
d_fitted = int(round(d_orig * fitted_ratio))
results.append({
'月份': str(month),
'维度': 'download_channel',
'渠道': ch_label,
'版本': '原始版',
'注册用户数': d_orig,
'付费用户数': n_orig,
'转化率': f"{n_orig/d_orig*100:.2f}%" if d_orig > 0 else '0%'
})
results.append({
'月份': str(month),
'维度': 'download_channel',
'渠道': ch_label,
'版本': '纯净版',
'注册用户数': d_clean,
'付费用户数': n_clean,
'转化率': f"{n_clean/d_clean*100:.2f}%" if d_clean > 0 else '0%'
})
results.append({
'月份': str(month),
'维度': 'download_channel',
'渠道': ch_label,
'版本': '拟合版',
'注册用户数': d_fitted,
'付费用户数': n_orig,
'转化率': f"{n_orig/d_fitted*100:.2f}%" if d_fitted > 0 else '0%'
})
print(f" 整体维度结果: {len([r for r in results if r['维度']=='整体'])}")
print(f" download_channel维度结果: {len([r for r in results if r['维度']=='download_channel'])}")
# ---- 区分 key_from ----
# 按注册月份 × key_from 统计付费用户
if len(order_df) > 0:
# 端内已完成的订单
internal_completed = order_df[order_df['is_internal'] & order_df['is_completed']]
# 关联用户注册月份
user_month_map = reg_df[['account_id', 'reg_month']].drop_duplicates('account_id')
internal_completed = internal_completed.merge(user_month_map, on='account_id', how='inner')
# 剔除全部退费的用户
internal_completed['is_refunded_order'] = internal_completed['is_refunded']
user_refund_stat = internal_completed.groupby('account_id').agg(
total=('trade_no', 'count'),
refunded=('is_refunded_order', 'sum')
).reset_index()
user_refund_stat['all_refunded'] = user_refund_stat['total'] == user_refund_stat['refunded']
all_refunded_users = set(user_refund_stat[user_refund_stat['all_refunded']]['account_id'])
internal_completed_norefund = internal_completed[~internal_completed['account_id'].isin(all_refunded_users)]
# 按 reg_month × key_from 统计付费用户数
kf_stats = internal_completed_norefund.groupby(['reg_month', 'key_from'])['account_id'].nunique().reset_index()
kf_stats.columns = ['reg_month', 'key_from', 'paid_users']
# 各月份注册人数(原始+纯净+拟合)
month_denom = reg_df.groupby('reg_month')['account_id'].nunique().reset_index()
month_denom.columns = ['reg_month', 'total_users']
# 纯净版每月分母
clean_reg = reg_df[~reg_df['only_external']]
month_clean = clean_reg.groupby('reg_month')['account_id'].nunique().reset_index()
month_clean.columns = ['reg_month', 'clean_users']
month_denom = month_denom.merge(month_clean, on='reg_month', how='left')
month_denom['clean_users'] = month_denom['clean_users'].fillna(0).astype(int)
# 拟合版每月分母
month_denom = month_denom.merge(monthly_fitted, on='reg_month', how='left')
month_denom['fitted_total'] = month_denom['fitted_total'].fillna(month_denom['total_users']).round(0).astype(int)
kf_stats = kf_stats.merge(month_denom, on='reg_month', how='left')
for _, row in kf_stats.iterrows():
denom_orig = int(row['total_users'])
paid = int(row['paid_users'])
denom_clean = int(row['clean_users'])
denom_fitted = int(row['fitted_total'])
results.append({
'月份': str(row['reg_month']),
'维度': 'key_from',
'渠道': row['key_from'],
'版本': '原始版',
'注册用户数': denom_orig,
'付费用户数': paid,
'转化率': f"{paid/denom_orig*100:.2f}%" if denom_orig > 0 else '0%'
})
results.append({
'月份': str(row['reg_month']),
'维度': 'key_from',
'渠道': row['key_from'],
'版本': '纯净版',
'注册用户数': denom_clean,
'付费用户数': paid,
'转化率': f"{paid/denom_clean*100:.2f}%" if denom_clean > 0 else '0%'
})
results.append({
'月份': str(row['reg_month']),
'维度': 'key_from',
'渠道': row['key_from'],
'版本': '拟合版',
'注册用户数': denom_fitted,
'付费用户数': paid,
'转化率': f"{paid/denom_fitted*100:.2f}%" if denom_fitted > 0 else '0%'
})
# 端外汇总
external_completed = order_df[(~order_df['is_internal']) & (order_df['order_status'] == 3)]
external_completed = external_completed.merge(user_month_map, on='account_id', how='inner')
ext_stats = external_completed.groupby('reg_month')['account_id'].nunique().reset_index()
ext_stats.columns = ['reg_month', 'paid_users']
ext_stats = ext_stats.merge(month_denom, on='reg_month', how='left')
for _, row in ext_stats.iterrows():
denom_orig = int(row['total_users'])
paid = int(row['paid_users'])
denom_clean = int(row['clean_users'])
denom_fitted = int(row['fitted_total'])
results.append({
'月份': str(row['reg_month']),
'维度': 'key_from',
'渠道': '端外合计',
'版本': '原始版',
'注册用户数': denom_orig,
'付费用户数': paid,
'转化率': f"{paid/denom_orig*100:.2f}%" if denom_orig > 0 else '0%'
})
results.append({
'月份': str(row['reg_month']),
'维度': 'key_from',
'渠道': '端外合计',
'版本': '纯净版',
'注册用户数': denom_clean,
'付费用户数': paid,
'转化率': f"{paid/denom_clean*100:.2f}%" if denom_clean > 0 else '0%'
})
results.append({
'月份': str(row['reg_month']),
'维度': 'key_from',
'渠道': '端外合计',
'版本': '拟合版',
'注册用户数': denom_fitted,
'付费用户数': paid,
'转化率': f"{paid/denom_fitted*100:.2f}%" if denom_fitted > 0 else '0%'
})
print(f" key_from维度结果: {len([r for r in results if r['维度']=='key_from'])}")
# ========== 输出到Excel ==========
result_df = pd.DataFrame(results)
print(f"\n总结果数: {len(result_df)}")
# 按维度排序
dim_order = {'整体': 0, 'download_channel': 1, 'key_from': 2}
result_df['dim_sort'] = result_df['维度'].map(dim_order)
result_df = result_df.sort_values(['月份', 'dim_sort', '版本', '渠道']).drop(columns=['dim_sort'])
output_path = '/root/.openclaw/workspace/output/conversion_rate_202509_202605.xlsx'
with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# Sheet1: 全部数据
result_df.to_excel(writer, sheet_name='全部数据', index=False)
# 各维度分sheet
for dim in ['整体', 'download_channel', 'key_from']:
dim_df = result_df[result_df['维度'] == dim].copy()
dim_df = dim_df.drop(columns=['维度'])
sheet_name = dim[:31] # Excel sheet name limit
dim_df.to_excel(writer, sheet_name=sheet_name, index=False)
# 透视表:整体(原始版 vs 纯净版 vs 拟合版)
overall_df = result_df[result_df['维度'] == '整体'].copy()
pivot = overall_df.pivot_table(
index='月份', columns='版本', values='转化率', aggfunc='first'
)
pivot.to_excel(writer, sheet_name='整体对比')
print(f"\n输出文件: {output_path}")
print("Done!")