ai_member_xiaoxi/scripts/welfare_merge_final.py
2026-05-20 08:00:01 +08:00

226 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""合并订单数据 + parent_address生成最终 Excel"""
import csv
import os
from collections import Counter
# 文件路径
ORDER_FILE = '/root/.openclaw/workspace/output/welfare_step1_result.txt'
ADDR_FILE = '/root/.openclaw/workspace/output/parent_address_result.txt'
FINAL_OUTPUT = '/root/.openclaw/workspace/output/福利品用户名单.xlsx'
# 读取订单数据
orders = []
with open(ORDER_FILE, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
orders.append(row)
print(f"读取订单数: {len(orders)}")
# 按 (account_id, trade_no) 去重
seen = set()
orders_dedup = []
dup_count = 0
for row in orders:
key = (row['用户ID'], row['交易号'])
if key in seen:
dup_count += 1
continue
seen.add(key)
orders_dedup.append(row)
print(f"去重后订单数: {len(orders_dedup)} (移除 {dup_count} 条重复)")
# 读取地址数据(跳过非 CSV 行如 CREATE TABLE / COPY
addr_map = {}
with open(ADDR_FILE, 'r') as f:
lines = f.readlines()
header_idx = 0
for i, line in enumerate(lines):
if line.startswith('account_id'):
header_idx = i
break
reader = csv.DictReader(lines[header_idx:])
for row in reader:
addr_map[row['account_id']] = row
print(f"读取地址记录数: {len(addr_map)}")
# 合并
matched = 0
unmatched = 0
for row in orders_dedup:
aid = row['用户ID']
if aid in addr_map:
row['收件人'] = addr_map[aid].get('name', '')
row['电话'] = addr_map[aid].get('phone_number', '')
row['区域'] = addr_map[aid].get('region', '')
row['地址'] = addr_map[aid].get('address', '')
matched += 1
else:
row['收件人'] = ''
row['电话'] = ''
row['区域'] = ''
row['地址'] = ''
unmatched += 1
print(f"有地址: {matched}, 无地址: {unmatched}")
# 情况分布
case_counts = Counter(row['来源情况'] for row in orders_dedup)
print("\n=== 情况分布(按订单行)===")
for k, v in case_counts.most_common():
print(f" {k}: {v}")
# 唯一用户数
unique_users = set(row['用户ID'] for row in orders_dedup)
print(f"\n唯一用户数: {len(unique_users)}")
# 有地址的唯一用户数
users_with_addr = set(row['用户ID'] for row in orders_dedup if row['收件人'])
print(f"有地址的唯一用户数: {len(users_with_addr)}")
# 情况分布(按用户)
user_case = {}
for row in orders_dedup:
uid = row['用户ID']
if uid not in user_case:
user_case[uid] = set()
user_case[uid].add(row['来源情况'])
case_user_counts = Counter('+'.join(sorted(v)) for v in user_case.values())
print("\n=== 情况分布(按用户数)===")
for k, v in case_user_counts.most_common():
print(f" {k}: {v}")
# 生成 Excel
try:
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
except ImportError:
os.system('pip install openpyxl -q')
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from openpyxl.utils import get_column_letter
wb = Workbook()
ws = wb.active
ws.title = "福利品用户名单"
# 表头
headers = ['用户ID', '交易号', '商品ID', '商品名称', '渠道(key_from)', '购课日期',
'支付金额(元)', '退款金额(元)', '退费状态', '来源情况',
'收件人', '电话', '区域', '地址']
header_fill = PatternFill(start_color='4472C4', end_color='4472C4', fill_type='solid')
header_font = Font(color='FFFFFF', bold=True, size=11)
thin_border = Border(
left=Side(style='thin'), right=Side(style='thin'),
top=Side(style='thin'), bottom=Side(style='thin')
)
for col, h in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=h)
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.border = thin_border
# 数据行
for i, row in enumerate(orders_dedup, 2):
values = [
int(row['用户ID']),
row['交易号'],
row['商品ID'],
row['商品名称'].strip(),
row['渠道'].strip(),
row['购课日期'],
float(row['支付金额(元)']),
float(row['退款金额(元)']) if row['退款金额(元)'] else '',
row['退费状态'],
row['来源情况'],
row['收件人'],
row['电话'],
row['区域'],
row['地址'],
]
for col, val in enumerate(values, 1):
cell = ws.cell(row=i, column=col, value=val)
cell.border = thin_border
if col in (1, 2, 3, 7, 8, 9, 10):
cell.alignment = Alignment(horizontal='center')
elif col == 4:
cell.alignment = Alignment(horizontal='left')
# 列宽
col_widths = [10, 22, 8, 22, 40, 20, 14, 14, 10, 14, 12, 16, 30, 40]
for i, w in enumerate(col_widths, 1):
ws.column_dimensions[get_column_letter(i)].width = w
# 冻结首行
ws.freeze_panes = 'A2'
# 添加统计 sheet
ws2 = wb.create_sheet("统计汇总")
ws2.merge_cells('A1:D1')
ws2.cell(row=1, column=1, value="福利品用户统计汇总").font = Font(bold=True, size=14)
ws2.cell(row=2, column=1, value=f"总用户数: {len(unique_users)}")
ws2.cell(row=3, column=1, value=f"有地址的用户数: {len(users_with_addr)}")
ws2.cell(row=4, column=1, value=f"无地址的用户数: {len(unique_users - users_with_addr)}")
ws2.cell(row=5, column=1, value=f"总订单数: {len(orders_dedup)}")
ws2.cell(row=7, column=1, value="情况分布(按用户数)").font = Font(bold=True)
for i, (k, v) in enumerate(case_user_counts.most_common(), 8):
ws2.cell(row=i, column=1, value=k)
ws2.cell(row=i, column=2, value=v)
ws2.cell(row=7, column=4, value="情况分布(按订单行数)").font = Font(bold=True)
for i, (k, v) in enumerate(case_counts.most_common(), 8):
ws2.cell(row=i, column=4, value=k)
ws2.cell(row=i, column=5, value=v)
# 添加用户地址汇总 sheet按用户去重仅保留地址信息
ws3 = wb.create_sheet("用户地址汇总")
addr_headers = ['用户ID', '收件人', '电话', '区域', '地址', '来源情况']
for col, h in enumerate(addr_headers, 1):
cell = ws3.cell(row=1, column=col, value=h)
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
cell.border = thin_border
# 按用户去重,取第一个订单的地址信息
user_addrs = {}
for row in orders_dedup:
uid = row['用户ID']
if uid not in user_addrs:
user_addrs[uid] = row
# 如果有多个来源情况,合并
elif row['来源情况'] not in user_addrs[uid]['来源情况']:
user_addrs[uid]['来源情况'] = user_addrs[uid]['来源情况'] + '+' + row['来源情况']
for i, (uid, row) in enumerate(sorted(user_addrs.items()), 2):
values = [
int(uid),
row['收件人'],
row['电话'],
row['区域'],
row['地址'],
row['来源情况'],
]
for col, val in enumerate(values, 1):
cell = ws3.cell(row=i, column=col, value=val)
cell.border = thin_border
cell.alignment = Alignment(horizontal='center')
if col in (2, 4, 5):
cell.alignment = Alignment(horizontal='left')
addr_col_widths = [10, 12, 16, 30, 40, 14]
for i, w in enumerate(addr_col_widths, 1):
ws3.column_dimensions[get_column_letter(i)].width = w
ws3.freeze_panes = 'A2'
wb.save(FINAL_OUTPUT)
print(f"\n✅ Excel 已保存: {FINAL_OUTPUT}")
print(f"用户地址汇总 Sheet: {len(user_addrs)}")