ai_member_xiaoxi/scripts/enhanced_phone_chapter_analysis.py
2026-06-17 08:00:01 +08:00

573 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
增强版手机号行课分析脚本
根据用户购买的课包类型L1+L2联报→看L1L2单课包→看L2
查询对应课时的完成记录,包含完课时长、平均完课时长、角色年龄。
"""
import csv
import io
import os
import re
import subprocess
import sys
from datetime import datetime, date
try:
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils import get_column_letter
except ImportError:
print("ERROR: need openpyxl")
sys.exit(1)
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
WORKSPACE_DIR = os.path.dirname(SCRIPT_DIR)
sys.path.insert(0, SCRIPT_DIR)
from phone_encrypt import encrypt_phone
SECRETS_FILE = os.path.join(WORKSPACE_DIR, "secrets.env")
OUTPUT_DIR = os.path.join(WORKSPACE_DIR, "output")
DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
DB_PORT = "28591"
DB_USER = "ai_member"
DB_NAME = "vala_bi"
SHARD_COUNT = 8
# L1 S0 U00 chapter IDs: 343,344,345,346,348 (L01-L05)
# L2 S0 U00 chapter IDs: 55,56,57,58,59 (L01-L05)
L1_CHAPTERS = {343: "L1-S0-U00-L01", 344: "L1-S0-U00-L02", 345: "L1-S0-U00-L03",
346: "L1-S0-U00-L04", 348: "L1-S0-U00-L05"}
L2_CHAPTERS = {55: "L2-S0-U00-L01", 56: "L2-S0-U00-L02", 57: "L2-S0-U00-L03",
58: "L2-S0-U00-L04", 59: "L2-S0-U00-L05"}
ALL_CHAPTERS = {**L1_CHAPTERS, **L2_CHAPTERS}
def load_pg_password():
with open(SECRETS_FILE, "r") as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.split("=", 1)[1].strip().strip("'\"")
def run_sql(sql, pg_password):
env = os.environ.copy()
env["PGPASSWORD"] = pg_password
result = subprocess.run(
["psql", "-h", DB_HOST, "-p", DB_PORT, "-U", DB_USER, "-d", DB_NAME, "--csv", "-c", sql],
capture_output=True, text=True, env=env, timeout=300,
)
if result.returncode != 0:
raise RuntimeError(f"SQL failed:\n{result.stderr}")
return result.stdout
def extract_phones(file_path):
phones = []
ext = os.path.splitext(file_path)[1].lower()
if ext in (".xlsx", ".xls"):
wb = openpyxl.load_workbook(file_path, read_only=True)
for ws in wb:
for row in ws.iter_rows(values_only=True):
for cell in row:
if cell is not None:
val = str(int(cell)) if isinstance(cell, float) else str(cell)
val = val.strip()
if re.match(r"^1\d{10}$", val):
phones.append(val)
elif ext == ".csv":
with open(file_path, "r", encoding="utf-8-sig") as f:
for row in csv.reader(f):
for cell in row:
val = cell.strip()
if re.match(r"^1\d{10}$", val):
phones.append(val)
seen = set()
unique = []
for p in phones:
if p not in seen:
seen.add(p)
unique.append(p)
return unique
def match_accounts(phones, pg_password):
encrypt_to_originals = {}
for p in phones:
enc = encrypt_phone(p)
encrypt_to_originals.setdefault(enc, []).append(p)
conditions = ", ".join(f"'{enc}'" for enc in encrypt_to_originals)
sql = f"""
SELECT id AS account_id, tel_encrypt
FROM bi_vala_app_account
WHERE tel_encrypt IN ({conditions}) AND status = 1 AND deleted_at IS NULL
ORDER BY id;
"""
output = run_sql(sql, pg_password)
account_to_phone = {}
matched_encs = set()
for row in csv.DictReader(io.StringIO(output)):
aid = row["account_id"]
enc = row["tel_encrypt"]
matched_encs.add(enc)
if enc in encrypt_to_originals:
account_to_phone[aid] = encrypt_to_originals[enc][0]
unmatched = []
for enc, originals in encrypt_to_originals.items():
if enc not in matched_encs:
unmatched.extend(originals)
return list(account_to_phone.keys()), account_to_phone, unmatched
def get_purchase_info(account_ids, pg_password):
"""获取每个用户的购买课包类型"""
if not account_ids:
return {}
aid_list = ", ".join(account_ids)
sql = f"""
SELECT o.account_id, o.out_trade_no, o.pay_amount_int / 100.0 AS pay_amount,
o.pay_success_date, o.order_status, o.key_from
FROM bi_vala_order o
WHERE o.account_id IN ({aid_list})
AND o.pay_success_date IS NOT NULL
AND o.order_status IN (3, 4)
ORDER BY o.account_id, o.pay_success_date;
"""
output = run_sql(sql, pg_password)
# Determine course package per account
# 1999 = L2 only, 3598 = L1+L2 联报
account_packages = {}
for row in csv.DictReader(io.StringIO(output)):
aid = row["account_id"]
amount = float(row["pay_amount"])
status = int(row["order_status"])
if aid not in account_packages:
account_packages[aid] = {"has_l1l2": False, "has_l2_only": False, "active_orders": []}
if status == 3: # completed
account_packages[aid]["active_orders"].append(amount)
if abs(amount - 3598) < 1:
account_packages[aid]["has_l1l2"] = True
elif abs(amount - 1999) < 1:
account_packages[aid]["has_l2_only"] = True
# Determine target level
result = {}
for aid, info in account_packages.items():
if info["has_l1l2"]:
result[aid] = "L1" # L1+L2联报 → 看L1
elif info["has_l2_only"]:
result[aid] = "L2" # L2单课包 → 看L2
else:
result[aid] = "L1" # default
return result
def get_character_info(account_ids, pg_password):
"""获取角色年龄信息"""
if not account_ids:
return {}
aid_list = ", ".join(account_ids)
sql = f"""
SELECT account_id, id AS character_id, nickname, birthday, gender, created_at
FROM bi_vala_app_character
WHERE account_id IN ({aid_list}) AND status = 1
ORDER BY account_id, id;
"""
output = run_sql(sql, pg_password)
result = {}
for row in csv.DictReader(io.StringIO(output)):
aid = row["account_id"]
cid = row["character_id"]
birthday = row["birthday"]
age = ""
if birthday:
try:
bd = datetime.strptime(birthday, "%Y-%m-%d").date()
today = date.today()
age = f"{today.year - bd.year - ((today.month, today.day) < (bd.month, bd.day))}"
except:
age = birthday
gender = "" if row["gender"] == "1" else ""
result[aid] = {
"character_id": cid,
"nickname": row["nickname"],
"birthday": birthday,
"age": age,
"gender": gender,
}
return result
def query_chapter_play(account_ids, target_levels, pg_password):
"""查询课时完成记录按目标Level过滤"""
if not account_ids:
return []
aid_list = ", ".join(account_ids)
all_chapter_ids = ", ".join(str(c) for c in ALL_CHAPTERS)
# Build union for chapter play records
chapter_parts = []
for i in range(SHARD_COUNT):
chapter_parts.append(
f"SELECT user_id, chapter_id, chapter_unique_id, date(updated_at) AS finish_date "
f"FROM bi_user_chapter_play_record_{i} "
f"WHERE chapter_id IN ({all_chapter_ids}) AND play_status = 1"
)
chapter_union = " UNION ALL ".join(chapter_parts)
# Build union for component play records
comp_parts = []
for i in range(SHARD_COUNT):
comp_parts.append(
f"SELECT chapter_unique_id, SUM(interval_time) AS total_interval "
f"FROM bi_user_component_play_record_{i} "
f"WHERE chapter_unique_id IN (SELECT chapter_unique_id FROM cp_unique_ids) "
f"GROUP BY chapter_unique_id"
)
comp_union = " UNION ALL ".join(comp_parts)
sql = f"""
WITH chapter_play AS (
{chapter_union}
),
filtered_play AS (
SELECT cp.user_id, cp.chapter_id, cp.chapter_unique_id, cp.finish_date,
ROW_NUMBER() OVER (PARTITION BY cp.user_id, cp.chapter_id ORDER BY cp.finish_date) AS rn
FROM chapter_play cp
JOIN bi_vala_app_character c ON cp.user_id = c.id
JOIN bi_vala_app_account a ON c.account_id = a.id
WHERE a.id IN ({aid_list})
),
cp_unique_ids AS (
SELECT DISTINCT chapter_unique_id FROM filtered_play WHERE rn = 1
),
comp_time AS (
SELECT chapter_unique_id, SUM(total_interval) AS total_interval
FROM ({comp_union}) t
GROUP BY chapter_unique_id
)
SELECT
a.id AS account_id,
fp.user_id AS character_id,
fp.chapter_id,
FORMAT('%s-%s-%s-%s', l.course_level, l.course_season, l.course_unit, l.course_lesson) AS course_id,
l.course_level,
fp.finish_date,
FORMAT('%s:%s',
FLOOR(COALESCE(ct.total_interval, 0) / 1000 / 60),
LPAD(CAST(MOD(COALESCE(ct.total_interval, 0) / 1000, 60) AS TEXT), 2, '0')
) AS finish_time,
COALESCE(ct.total_interval, 0) AS total_interval_ms
FROM filtered_play fp
JOIN bi_vala_app_character c ON fp.user_id = c.id
JOIN bi_vala_app_account a ON c.account_id = a.id
LEFT JOIN bi_level_unit_lesson l ON fp.chapter_id = l.id
LEFT JOIN comp_time ct ON fp.chapter_unique_id = ct.chapter_unique_id
WHERE fp.rn = 1
ORDER BY a.id, fp.chapter_id;
"""
output = run_sql(sql, pg_password)
records = []
for row in csv.DictReader(io.StringIO(output)):
aid = row["account_id"]
level = row["course_level"]
target = target_levels.get(aid, "L1")
if level == target: # Only include target level records
records.append(row)
return records
def time_to_minutes(t):
"""Convert 'MM:SS' to minutes float"""
try:
t = t.strip().rstrip(".")
parts = t.split(":")
if len(parts) >= 2:
return int(parts[0]) + int(float(parts[1])) / 60.0
return 0
except:
return 0
def generate_report(records, account_to_phone, target_levels, character_info, unmatched, output_path):
"""生成增强版分析报告"""
wb = openpyxl.Workbook()
# ─── Sheet 1: 行课明细 ───
ws1 = wb.active
ws1.title = "行课明细"
headers = ["账号ID", "手机号", "角色ID", "角色昵称", "角色年龄", "性别",
"目标Level", "课程名称", "课程完成时间", "课程耗时(分:秒)", "课程耗时(分钟)"]
ws1.append(headers)
# Style header
hf = Font(bold=True, color="FFFFFF", size=11)
hfill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
ha = Alignment(horizontal="center", vertical="center", wrap_text=True)
for cell in ws1[1]:
cell.font = hf
cell.fill = hfill
cell.alignment = ha
# Group records by account
by_account = {}
for r in records:
aid = r["account_id"]
by_account.setdefault(aid, []).append(r)
row_num = 2
for aid in sorted(by_account.keys(), key=int):
recs = by_account[aid]
phone = account_to_phone.get(aid, "")
cinfo = character_info.get(aid, {})
target = target_levels.get(aid, "L1")
for r in recs:
mins = time_to_minutes(r["finish_time"])
ws1.append([
aid, phone, cinfo.get("character_id", ""),
cinfo.get("nickname", ""), cinfo.get("age", ""), cinfo.get("gender", ""),
target, r["course_id"], r["finish_date"], r["finish_time"], round(mins, 1)
])
row_num += 1
# Add unmatched phones
for p in sorted(unmatched):
ws1.append(["", p, "", "", "", "", "", "", "", "", ""])
# Add accounts with no records
no_record_aids = set(account_to_phone.keys()) - set(by_account.keys())
for aid in sorted(no_record_aids, key=int):
phone = account_to_phone.get(aid, "")
cinfo = character_info.get(aid, {})
target = target_levels.get(aid, "L1")
ws1.append([aid, phone, cinfo.get("character_id", ""),
cinfo.get("nickname", ""), cinfo.get("age", ""), cinfo.get("gender", ""),
target, "无行课记录", "", "", ""])
# Auto width
for col in ws1.columns:
max_w = 0
cl = col[0].column_letter
for cell in col:
v = str(cell.value) if cell.value else ""
w = sum(2 if ord(c) > 127 else 1 for c in v)
if w > max_w:
max_w = w
ws1.column_dimensions[cl].width = min(max_w + 3, 25)
ws1.freeze_panes = "A2"
# ─── Sheet 2: 完课时长交叉分析 ───
ws2 = wb.create_sheet("完课时长交叉分析")
# Get all course IDs for the target level
target_courses = {}
for r in records:
cid = r["course_id"]
level = r["course_level"]
if cid not in target_courses:
target_courses[cid] = level
# Sort courses
sorted_courses = sorted(target_courses.keys())
# Header
analysis_headers = ["账号ID", "手机号", "角色昵称", "角色年龄", "目标Level"] + sorted_courses + ["总耗时(分钟)", "平均每课(分钟)", "完成课时数"]
ws2.append(analysis_headers)
for cell in ws2[1]:
cell.font = hf
cell.fill = hfill
cell.alignment = ha
# Data rows
row_num = 2
all_user_times = {c: [] for c in sorted_courses} # for computing averages
for aid in sorted(by_account.keys(), key=int):
recs = by_account[aid]
phone = account_to_phone.get(aid, "")
cinfo = character_info.get(aid, {})
target = target_levels.get(aid, "L1")
course_times = {}
total_mins = 0
completed_count = 0
for r in recs:
cid = r["course_id"]
mins = time_to_minutes(r["finish_time"])
course_times[cid] = mins
total_mins += mins
completed_count += 1
if cid in all_user_times:
all_user_times[cid].append(mins)
row_data = [aid, phone, cinfo.get("nickname", ""), cinfo.get("age", ""), target]
for c in sorted_courses:
t = course_times.get(c)
if t is not None:
row_data.append(round(t, 1))
else:
row_data.append("")
avg = round(total_mins / completed_count, 1) if completed_count > 0 else 0
row_data.extend([round(total_mins, 1), avg, completed_count])
ws2.append(row_data)
row_num += 1
# Average row
avg_row = ["", "", "", "平均", ""]
for c in sorted_courses:
times = all_user_times.get(c, [])
if times:
avg_row.append(round(sum(times) / len(times), 1))
else:
avg_row.append("")
# Overall averages
all_times = [t for times in all_user_times.values() for t in times]
if all_times:
avg_row.append(round(sum(all_times), 1))
avg_row.append(round(sum(all_times) / len(all_times), 1))
avg_row.append(len(all_times))
else:
avg_row.extend(["", "", ""])
ws2.append(avg_row)
avg_fill = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid")
for cell in ws2[row_num]:
cell.fill = avg_fill
cell.font = Font(bold=True)
# Auto width
for col in ws2.columns:
max_w = 0
cl = col[0].column_letter
for cell in col:
v = str(cell.value) if cell.value else ""
w = sum(2 if ord(c) > 127 else 1 for c in v)
if w > max_w:
max_w = w
ws2.column_dimensions[cl].width = min(max_w + 3, 22)
ws2.freeze_panes = "A2"
# ─── Sheet 3: 汇总统计 ───
ws3 = wb.create_sheet("汇总统计")
ws3.append(["指标", "数值"])
for cell in ws3[1]:
cell.font = hf
cell.fill = hfill
cell.alignment = ha
total_users = len(by_account)
total_records = len(records)
all_times_flat = [time_to_minutes(r["finish_time"]) for r in records]
overall_avg = round(sum(all_times_flat) / len(all_times_flat), 1) if all_times_flat else 0
ws3.append(["输入手机号数", len(account_to_phone) + len(unmatched)])
ws3.append(["匹配账号数", len(account_to_phone)])
ws3.append(["未匹配手机号数", len(unmatched)])
ws3.append(["有行课记录的用户数", total_users])
ws3.append(["总课时完成记录数", total_records])
ws3.append(["整体平均完课时长(分钟)", overall_avg])
ws3.append(["分析目标Level", "L1 (L1+L2联报→看L1)"])
# Per-user summary
ws3.append([])
ws3.append(["用户", "手机号", "角色昵称", "年龄", "完成课时数", "总耗时(分钟)", "平均耗时(分钟)"])
for aid in sorted(by_account.keys(), key=int):
recs = by_account[aid]
phone = account_to_phone.get(aid, "")
cinfo = character_info.get(aid, {})
times = [time_to_minutes(r["finish_time"]) for r in recs]
total = round(sum(times), 1)
avg = round(total / len(times), 1) if times else 0
ws3.append([aid, phone, cinfo.get("nickname", ""), cinfo.get("age", ""),
len(recs), total, avg])
for col in ws3.columns:
max_w = 0
cl = col[0].column_letter
for cell in col:
v = str(cell.value) if cell.value else ""
w = sum(2 if ord(c) > 127 else 1 for c in v)
if w > max_w:
max_w = w
ws3.column_dimensions[cl].width = min(max_w + 3, 25)
wb.save(output_path)
return output_path
def main():
if len(sys.argv) < 2:
print("Usage: python3 enhanced_phone_chapter_analysis.py <input_file> [--output <path>]")
sys.exit(1)
input_file = sys.argv[1]
output_path = None
for i, arg in enumerate(sys.argv):
if arg == "--output" and i + 1 < len(sys.argv):
output_path = sys.argv[i + 1]
if not output_path:
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(OUTPUT_DIR, f"enhanced_phone_analysis_{ts}.xlsx")
os.makedirs(OUTPUT_DIR, exist_ok=True)
pg_password = load_pg_password()
# Step 1: Extract phones
print(f"读取文件: {input_file}")
phones = extract_phones(input_file)
print(f"提取到 {len(phones)} 个手机号")
# Step 2: Match accounts
print("匹配账号...")
account_ids, account_to_phone, unmatched = match_accounts(phones, pg_password)
print(f"已匹配: {len(account_ids)} 个账号, 未匹配: {len(unmatched)}")
if not account_ids:
print("无匹配账号,仅生成空报告")
# Still generate report with unmatched
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "行课明细"
ws.append(["账号ID", "手机号", "角色ID", "角色昵称", "角色年龄", "性别",
"目标Level", "课程名称", "课程完成时间", "课程耗时(分:秒)", "课程耗时(分钟)"])
for p in sorted(unmatched):
ws.append(["", p, "", "", "", "", "", "", "", "", ""])
wb.save(output_path)
print(f"文件保存至: {output_path}")
return output_path
# Step 3: Get purchase info
print("获取购买课包信息...")
target_levels = get_purchase_info(account_ids, pg_password)
for aid, lvl in target_levels.items():
print(f" 账号 {aid}: 目标Level = {lvl}")
# Step 4: Get character info
print("获取角色年龄信息...")
character_info = get_character_info(account_ids, pg_password)
# Step 5: Query chapter play records
print("查询课时完成记录...")
records = query_chapter_play(account_ids, target_levels, pg_password)
print(f"查询到 {len(records)} 条目标Level课时记录")
# Step 6: Generate report
print("生成分析报告...")
generate_report(records, account_to_phone, target_levels, character_info, unmatched, output_path)
print(f"文件保存至: {output_path}")
return output_path
if __name__ == "__main__":
main()