#!/usr/bin/env python3 """ 增强版手机号行课分析脚本 根据用户购买的课包类型(L1+L2联报→看L1,L2单课包→看L2), 查询对应课时的完成记录,包含完课时长、平均完课时长、角色年龄。 """ import csv import io import os import re import subprocess import sys from datetime import datetime, date try: import openpyxl from openpyxl.styles import Font, Alignment, PatternFill, Border, Side from openpyxl.utils import get_column_letter except ImportError: print("ERROR: need openpyxl") sys.exit(1) SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE_DIR = os.path.dirname(SCRIPT_DIR) sys.path.insert(0, SCRIPT_DIR) from phone_encrypt import encrypt_phone SECRETS_FILE = os.path.join(WORKSPACE_DIR, "secrets.env") OUTPUT_DIR = os.path.join(WORKSPACE_DIR, "output") DB_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" DB_PORT = "28591" DB_USER = "ai_member" DB_NAME = "vala_bi" SHARD_COUNT = 8 # L1 S0 U00 chapter IDs: 343,344,345,346,348 (L01-L05) # L2 S0 U00 chapter IDs: 55,56,57,58,59 (L01-L05) L1_CHAPTERS = {343: "L1-S0-U00-L01", 344: "L1-S0-U00-L02", 345: "L1-S0-U00-L03", 346: "L1-S0-U00-L04", 348: "L1-S0-U00-L05"} L2_CHAPTERS = {55: "L2-S0-U00-L01", 56: "L2-S0-U00-L02", 57: "L2-S0-U00-L03", 58: "L2-S0-U00-L04", 59: "L2-S0-U00-L05"} ALL_CHAPTERS = {**L1_CHAPTERS, **L2_CHAPTERS} def load_pg_password(): with open(SECRETS_FILE, "r") as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.split("=", 1)[1].strip().strip("'\"") def run_sql(sql, pg_password): env = os.environ.copy() env["PGPASSWORD"] = pg_password result = subprocess.run( ["psql", "-h", DB_HOST, "-p", DB_PORT, "-U", DB_USER, "-d", DB_NAME, "--csv", "-c", sql], capture_output=True, text=True, env=env, timeout=300, ) if result.returncode != 0: raise RuntimeError(f"SQL failed:\n{result.stderr}") return result.stdout def extract_phones(file_path): phones = [] ext = os.path.splitext(file_path)[1].lower() if ext in (".xlsx", ".xls"): wb = openpyxl.load_workbook(file_path, read_only=True) for ws in wb: for row in ws.iter_rows(values_only=True): for cell in row: if cell is not None: val = str(int(cell)) if isinstance(cell, float) else str(cell) val = val.strip() if re.match(r"^1\d{10}$", val): phones.append(val) elif ext == ".csv": with open(file_path, "r", encoding="utf-8-sig") as f: for row in csv.reader(f): for cell in row: val = cell.strip() if re.match(r"^1\d{10}$", val): phones.append(val) seen = set() unique = [] for p in phones: if p not in seen: seen.add(p) unique.append(p) return unique def match_accounts(phones, pg_password): encrypt_to_originals = {} for p in phones: enc = encrypt_phone(p) encrypt_to_originals.setdefault(enc, []).append(p) conditions = ", ".join(f"'{enc}'" for enc in encrypt_to_originals) sql = f""" SELECT id AS account_id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({conditions}) AND status = 1 AND deleted_at IS NULL ORDER BY id; """ output = run_sql(sql, pg_password) account_to_phone = {} matched_encs = set() for row in csv.DictReader(io.StringIO(output)): aid = row["account_id"] enc = row["tel_encrypt"] matched_encs.add(enc) if enc in encrypt_to_originals: account_to_phone[aid] = encrypt_to_originals[enc][0] unmatched = [] for enc, originals in encrypt_to_originals.items(): if enc not in matched_encs: unmatched.extend(originals) return list(account_to_phone.keys()), account_to_phone, unmatched def get_purchase_info(account_ids, pg_password): """获取每个用户的购买课包类型""" if not account_ids: return {} aid_list = ", ".join(account_ids) sql = f""" SELECT o.account_id, o.out_trade_no, o.pay_amount_int / 100.0 AS pay_amount, o.pay_success_date, o.order_status, o.key_from FROM bi_vala_order o WHERE o.account_id IN ({aid_list}) AND o.pay_success_date IS NOT NULL AND o.order_status IN (3, 4) ORDER BY o.account_id, o.pay_success_date; """ output = run_sql(sql, pg_password) # Determine course package per account # 1999 = L2 only, 3598 = L1+L2 联报 account_packages = {} for row in csv.DictReader(io.StringIO(output)): aid = row["account_id"] amount = float(row["pay_amount"]) status = int(row["order_status"]) if aid not in account_packages: account_packages[aid] = {"has_l1l2": False, "has_l2_only": False, "active_orders": []} if status == 3: # completed account_packages[aid]["active_orders"].append(amount) if abs(amount - 3598) < 1: account_packages[aid]["has_l1l2"] = True elif abs(amount - 1999) < 1: account_packages[aid]["has_l2_only"] = True # Determine target level result = {} for aid, info in account_packages.items(): if info["has_l1l2"]: result[aid] = "L1" # L1+L2联报 → 看L1 elif info["has_l2_only"]: result[aid] = "L2" # L2单课包 → 看L2 else: result[aid] = "L1" # default return result def get_character_info(account_ids, pg_password): """获取角色年龄信息""" if not account_ids: return {} aid_list = ", ".join(account_ids) sql = f""" SELECT account_id, id AS character_id, nickname, birthday, gender, created_at FROM bi_vala_app_character WHERE account_id IN ({aid_list}) AND status = 1 ORDER BY account_id, id; """ output = run_sql(sql, pg_password) result = {} for row in csv.DictReader(io.StringIO(output)): aid = row["account_id"] cid = row["character_id"] birthday = row["birthday"] age = "" if birthday: try: bd = datetime.strptime(birthday, "%Y-%m-%d").date() today = date.today() age = f"{today.year - bd.year - ((today.month, today.day) < (bd.month, bd.day))}岁" except: age = birthday gender = "男" if row["gender"] == "1" else "女" result[aid] = { "character_id": cid, "nickname": row["nickname"], "birthday": birthday, "age": age, "gender": gender, } return result def query_chapter_play(account_ids, target_levels, pg_password): """查询课时完成记录,按目标Level过滤""" if not account_ids: return [] aid_list = ", ".join(account_ids) all_chapter_ids = ", ".join(str(c) for c in ALL_CHAPTERS) # Build union for chapter play records chapter_parts = [] for i in range(SHARD_COUNT): chapter_parts.append( f"SELECT user_id, chapter_id, chapter_unique_id, date(updated_at) AS finish_date " f"FROM bi_user_chapter_play_record_{i} " f"WHERE chapter_id IN ({all_chapter_ids}) AND play_status = 1" ) chapter_union = " UNION ALL ".join(chapter_parts) # Build union for component play records comp_parts = [] for i in range(SHARD_COUNT): comp_parts.append( f"SELECT chapter_unique_id, SUM(interval_time) AS total_interval " f"FROM bi_user_component_play_record_{i} " f"WHERE chapter_unique_id IN (SELECT chapter_unique_id FROM cp_unique_ids) " f"GROUP BY chapter_unique_id" ) comp_union = " UNION ALL ".join(comp_parts) sql = f""" WITH chapter_play AS ( {chapter_union} ), filtered_play AS ( SELECT cp.user_id, cp.chapter_id, cp.chapter_unique_id, cp.finish_date, ROW_NUMBER() OVER (PARTITION BY cp.user_id, cp.chapter_id ORDER BY cp.finish_date) AS rn FROM chapter_play cp JOIN bi_vala_app_character c ON cp.user_id = c.id JOIN bi_vala_app_account a ON c.account_id = a.id WHERE a.id IN ({aid_list}) ), cp_unique_ids AS ( SELECT DISTINCT chapter_unique_id FROM filtered_play WHERE rn = 1 ), comp_time AS ( SELECT chapter_unique_id, SUM(total_interval) AS total_interval FROM ({comp_union}) t GROUP BY chapter_unique_id ) SELECT a.id AS account_id, fp.user_id AS character_id, fp.chapter_id, FORMAT('%s-%s-%s-%s', l.course_level, l.course_season, l.course_unit, l.course_lesson) AS course_id, l.course_level, fp.finish_date, FORMAT('%s:%s', FLOOR(COALESCE(ct.total_interval, 0) / 1000 / 60), LPAD(CAST(MOD(COALESCE(ct.total_interval, 0) / 1000, 60) AS TEXT), 2, '0') ) AS finish_time, COALESCE(ct.total_interval, 0) AS total_interval_ms FROM filtered_play fp JOIN bi_vala_app_character c ON fp.user_id = c.id JOIN bi_vala_app_account a ON c.account_id = a.id LEFT JOIN bi_level_unit_lesson l ON fp.chapter_id = l.id LEFT JOIN comp_time ct ON fp.chapter_unique_id = ct.chapter_unique_id WHERE fp.rn = 1 ORDER BY a.id, fp.chapter_id; """ output = run_sql(sql, pg_password) records = [] for row in csv.DictReader(io.StringIO(output)): aid = row["account_id"] level = row["course_level"] target = target_levels.get(aid, "L1") if level == target: # Only include target level records records.append(row) return records def time_to_minutes(t): """Convert 'MM:SS' to minutes float""" try: t = t.strip().rstrip(".") parts = t.split(":") if len(parts) >= 2: return int(parts[0]) + int(float(parts[1])) / 60.0 return 0 except: return 0 def generate_report(records, account_to_phone, target_levels, character_info, unmatched, output_path): """生成增强版分析报告""" wb = openpyxl.Workbook() # ─── Sheet 1: 行课明细 ─── ws1 = wb.active ws1.title = "行课明细" headers = ["账号ID", "手机号", "角色ID", "角色昵称", "角色年龄", "性别", "目标Level", "课程名称", "课程完成时间", "课程耗时(分:秒)", "课程耗时(分钟)"] ws1.append(headers) # Style header hf = Font(bold=True, color="FFFFFF", size=11) hfill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") ha = Alignment(horizontal="center", vertical="center", wrap_text=True) for cell in ws1[1]: cell.font = hf cell.fill = hfill cell.alignment = ha # Group records by account by_account = {} for r in records: aid = r["account_id"] by_account.setdefault(aid, []).append(r) row_num = 2 for aid in sorted(by_account.keys(), key=int): recs = by_account[aid] phone = account_to_phone.get(aid, "") cinfo = character_info.get(aid, {}) target = target_levels.get(aid, "L1") for r in recs: mins = time_to_minutes(r["finish_time"]) ws1.append([ aid, phone, cinfo.get("character_id", ""), cinfo.get("nickname", ""), cinfo.get("age", ""), cinfo.get("gender", ""), target, r["course_id"], r["finish_date"], r["finish_time"], round(mins, 1) ]) row_num += 1 # Add unmatched phones for p in sorted(unmatched): ws1.append(["", p, "", "", "", "", "", "", "", "", ""]) # Add accounts with no records no_record_aids = set(account_to_phone.keys()) - set(by_account.keys()) for aid in sorted(no_record_aids, key=int): phone = account_to_phone.get(aid, "") cinfo = character_info.get(aid, {}) target = target_levels.get(aid, "L1") ws1.append([aid, phone, cinfo.get("character_id", ""), cinfo.get("nickname", ""), cinfo.get("age", ""), cinfo.get("gender", ""), target, "无行课记录", "", "", ""]) # Auto width for col in ws1.columns: max_w = 0 cl = col[0].column_letter for cell in col: v = str(cell.value) if cell.value else "" w = sum(2 if ord(c) > 127 else 1 for c in v) if w > max_w: max_w = w ws1.column_dimensions[cl].width = min(max_w + 3, 25) ws1.freeze_panes = "A2" # ─── Sheet 2: 完课时长交叉分析 ─── ws2 = wb.create_sheet("完课时长交叉分析") # Get all course IDs for the target level target_courses = {} for r in records: cid = r["course_id"] level = r["course_level"] if cid not in target_courses: target_courses[cid] = level # Sort courses sorted_courses = sorted(target_courses.keys()) # Header analysis_headers = ["账号ID", "手机号", "角色昵称", "角色年龄", "目标Level"] + sorted_courses + ["总耗时(分钟)", "平均每课(分钟)", "完成课时数"] ws2.append(analysis_headers) for cell in ws2[1]: cell.font = hf cell.fill = hfill cell.alignment = ha # Data rows row_num = 2 all_user_times = {c: [] for c in sorted_courses} # for computing averages for aid in sorted(by_account.keys(), key=int): recs = by_account[aid] phone = account_to_phone.get(aid, "") cinfo = character_info.get(aid, {}) target = target_levels.get(aid, "L1") course_times = {} total_mins = 0 completed_count = 0 for r in recs: cid = r["course_id"] mins = time_to_minutes(r["finish_time"]) course_times[cid] = mins total_mins += mins completed_count += 1 if cid in all_user_times: all_user_times[cid].append(mins) row_data = [aid, phone, cinfo.get("nickname", ""), cinfo.get("age", ""), target] for c in sorted_courses: t = course_times.get(c) if t is not None: row_data.append(round(t, 1)) else: row_data.append("") avg = round(total_mins / completed_count, 1) if completed_count > 0 else 0 row_data.extend([round(total_mins, 1), avg, completed_count]) ws2.append(row_data) row_num += 1 # Average row avg_row = ["", "", "", "平均", ""] for c in sorted_courses: times = all_user_times.get(c, []) if times: avg_row.append(round(sum(times) / len(times), 1)) else: avg_row.append("") # Overall averages all_times = [t for times in all_user_times.values() for t in times] if all_times: avg_row.append(round(sum(all_times), 1)) avg_row.append(round(sum(all_times) / len(all_times), 1)) avg_row.append(len(all_times)) else: avg_row.extend(["", "", ""]) ws2.append(avg_row) avg_fill = PatternFill(start_color="FFF2CC", end_color="FFF2CC", fill_type="solid") for cell in ws2[row_num]: cell.fill = avg_fill cell.font = Font(bold=True) # Auto width for col in ws2.columns: max_w = 0 cl = col[0].column_letter for cell in col: v = str(cell.value) if cell.value else "" w = sum(2 if ord(c) > 127 else 1 for c in v) if w > max_w: max_w = w ws2.column_dimensions[cl].width = min(max_w + 3, 22) ws2.freeze_panes = "A2" # ─── Sheet 3: 汇总统计 ─── ws3 = wb.create_sheet("汇总统计") ws3.append(["指标", "数值"]) for cell in ws3[1]: cell.font = hf cell.fill = hfill cell.alignment = ha total_users = len(by_account) total_records = len(records) all_times_flat = [time_to_minutes(r["finish_time"]) for r in records] overall_avg = round(sum(all_times_flat) / len(all_times_flat), 1) if all_times_flat else 0 ws3.append(["输入手机号数", len(account_to_phone) + len(unmatched)]) ws3.append(["匹配账号数", len(account_to_phone)]) ws3.append(["未匹配手机号数", len(unmatched)]) ws3.append(["有行课记录的用户数", total_users]) ws3.append(["总课时完成记录数", total_records]) ws3.append(["整体平均完课时长(分钟)", overall_avg]) ws3.append(["分析目标Level", "L1 (L1+L2联报→看L1)"]) # Per-user summary ws3.append([]) ws3.append(["用户", "手机号", "角色昵称", "年龄", "完成课时数", "总耗时(分钟)", "平均耗时(分钟)"]) for aid in sorted(by_account.keys(), key=int): recs = by_account[aid] phone = account_to_phone.get(aid, "") cinfo = character_info.get(aid, {}) times = [time_to_minutes(r["finish_time"]) for r in recs] total = round(sum(times), 1) avg = round(total / len(times), 1) if times else 0 ws3.append([aid, phone, cinfo.get("nickname", ""), cinfo.get("age", ""), len(recs), total, avg]) for col in ws3.columns: max_w = 0 cl = col[0].column_letter for cell in col: v = str(cell.value) if cell.value else "" w = sum(2 if ord(c) > 127 else 1 for c in v) if w > max_w: max_w = w ws3.column_dimensions[cl].width = min(max_w + 3, 25) wb.save(output_path) return output_path def main(): if len(sys.argv) < 2: print("Usage: python3 enhanced_phone_chapter_analysis.py [--output ]") sys.exit(1) input_file = sys.argv[1] output_path = None for i, arg in enumerate(sys.argv): if arg == "--output" and i + 1 < len(sys.argv): output_path = sys.argv[i + 1] if not output_path: ts = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = os.path.join(OUTPUT_DIR, f"enhanced_phone_analysis_{ts}.xlsx") os.makedirs(OUTPUT_DIR, exist_ok=True) pg_password = load_pg_password() # Step 1: Extract phones print(f"读取文件: {input_file}") phones = extract_phones(input_file) print(f"提取到 {len(phones)} 个手机号") # Step 2: Match accounts print("匹配账号...") account_ids, account_to_phone, unmatched = match_accounts(phones, pg_password) print(f"已匹配: {len(account_ids)} 个账号, 未匹配: {len(unmatched)} 个") if not account_ids: print("无匹配账号,仅生成空报告") # Still generate report with unmatched wb = openpyxl.Workbook() ws = wb.active ws.title = "行课明细" ws.append(["账号ID", "手机号", "角色ID", "角色昵称", "角色年龄", "性别", "目标Level", "课程名称", "课程完成时间", "课程耗时(分:秒)", "课程耗时(分钟)"]) for p in sorted(unmatched): ws.append(["", p, "", "", "", "", "", "", "", "", ""]) wb.save(output_path) print(f"文件保存至: {output_path}") return output_path # Step 3: Get purchase info print("获取购买课包信息...") target_levels = get_purchase_info(account_ids, pg_password) for aid, lvl in target_levels.items(): print(f" 账号 {aid}: 目标Level = {lvl}") # Step 4: Get character info print("获取角色年龄信息...") character_info = get_character_info(account_ids, pg_password) # Step 5: Query chapter play records print("查询课时完成记录...") records = query_chapter_play(account_ids, target_levels, pg_password) print(f"查询到 {len(records)} 条目标Level课时记录") # Step 6: Generate report print("生成分析报告...") generate_report(records, account_to_phone, target_levels, character_info, unmatched, output_path) print(f"文件保存至: {output_path}") return output_path if __name__ == "__main__": main()