#!/usr/bin/env python3 """ 退费用户学习数据查询脚本 用法: python3 query_refund_learning.py --start 2026-04-01 --end 2026-05-01 --output /tmp/report.json 参数: --start 订单付款起始日期 (YYYY-MM-DD) --end 订单付款截止日期 (YYYY-MM-DD) --output JSON 结果输出路径 --pure 是否剔除仍有有效订单的用户 (默认 true) --outlier 巩固用时异常阈值(分钟), 超过此值视为脏数据 (默认 60) """ import argparse, json, os, subprocess, sys def get_pg_password(): secrets_path = os.path.expanduser("~/.openclaw/workspace/secrets.env") with open(secrets_path) as f: for line in f: if line.startswith("PG_ONLINE_PASSWORD="): return line.split("'")[1] raise RuntimeError("PG_ONLINE_PASSWORD not found in secrets.env") def run_pg(db, sql, password): env = os.environ.copy() env["PGPASSWORD"] = password r = subprocess.run( ["psql", "-h", "bj-postgres-16pob4sg.sql.tencentcdb.com", "-p", "28591", "-U", "ai_member", "-d", db, "-t", "-A", "-F", "\t", "-c", sql], capture_output=True, text=True, env=env, timeout=120 ) if r.returncode != 0: print(f"SQL ERROR: {r.stderr}", file=sys.stderr) sys.exit(1) rows = [line.split("\t") for line in r.stdout.strip().split("\n") if line.strip()] return rows def main(): p = argparse.ArgumentParser() p.add_argument("--start", required=True) p.add_argument("--end", required=True) p.add_argument("--output", default="/tmp/refund_learning_report.json") p.add_argument("--pure", default="true") p.add_argument("--outlier", type=float, default=60.0) args = p.parse_args() pw = get_pg_password() pure_clause = "" if args.pure == "true": pure_clause = "WHERE NOT EXISTS (SELECT 1 FROM bi_vala_order o2 WHERE o2.account_id = ra.account_id AND o2.order_status = 3)" # --- Chapter ID mappings --- # L1-U0: 343,344,345,346,348 | L2-U0: 55,56,57,58,59 l1_ids = "343,344,345,346,348" l2_ids = "55,56,57,58,59" all_ids = f"{l1_ids},{l2_ids}" chapter_play_union = " UNION ALL ".join([ f"SELECT r.user_id, r.chapter_id FROM bi_user_chapter_play_record_{i} r JOIN refund_users ru ON r.user_id = ru.user_id WHERE r.play_status = 1 AND r.chapter_id IN ({all_ids})" for i in range(8) ]) base_cte = f""" WITH refund_accounts AS ( SELECT DISTINCT o.account_id FROM bi_vala_order o JOIN bi_vala_app_account a ON a.id = o.account_id AND a.status = 1 JOIN bi_refund_order r ON r.out_trade_no = o.out_trade_no AND r.status = 3 WHERE o.order_status = 4 AND o.pay_success_date >= '{args.start}' AND o.pay_success_date < '{args.end}' ), pure_refund_accounts AS ( SELECT ra.account_id FROM refund_accounts ra {pure_clause} ), refund_users AS ( SELECT c.id AS user_id, c.account_id FROM bi_vala_app_character c JOIN pure_refund_accounts pra ON c.account_id = pra.account_id WHERE c.deleted_at IS NULL ), all_done AS ({chapter_play_union}), user_done_count AS ( SELECT user_id, COUNT(DISTINCT CASE WHEN chapter_id IN ({l1_ids}) THEN chapter_id END) AS l1_done, COUNT(DISTINCT CASE WHEN chapter_id IN ({l2_ids}) THEN chapter_id END) AS l2_done FROM (SELECT DISTINCT user_id, chapter_id FROM all_done) t GROUP BY user_id ), qualified_users AS ( SELECT ru.user_id, ru.account_id FROM user_done_count udc JOIN refund_users ru ON udc.user_id = ru.user_id WHERE udc.l1_done = 5 OR udc.l2_done = 5 )""" result = {} # 1. Funnel counts print("Querying funnel counts...") rows = run_pg("vala_bi", f""" {base_cte} SELECT (SELECT COUNT(*) FROM refund_accounts), (SELECT COUNT(*) FROM pure_refund_accounts), (SELECT COUNT(DISTINCT account_id) FROM qualified_users), (SELECT COUNT(DISTINCT account_id) FROM qualified_users qu JOIN user_done_count udc ON qu.user_id = udc.user_id AND udc.l1_done = 5 AND udc.l2_done < 5), (SELECT COUNT(DISTINCT account_id) FROM qualified_users qu JOIN user_done_count udc ON qu.user_id = udc.user_id AND udc.l2_done = 5 AND udc.l1_done < 5), (SELECT COUNT(DISTINCT account_id) FROM qualified_users qu JOIN user_done_count udc ON qu.user_id = udc.user_id AND udc.l1_done = 5 AND udc.l2_done = 5) """, pw) r = rows[0] result["funnel"] = { "total_refund": int(r[0]), "pure_refund": int(r[1]), "completed_u0": int(r[2]), "l1_only": int(r[3]), "l2_only": int(r[4]), "both": int(r[5]) } # 2. Review data (with outlier filtering) print("Querying review data...") outlier_ms = int(args.outlier * 60 * 1000) rows = run_pg("vala_bi", f""" {base_cte}, review_with_rate AS ( SELECT rv.level, rv.chapter_id, rv.user_id, rv.play_time, (SELECT COUNT(*) FROM jsonb_array_elements(rv.question_list::jsonb) q WHERE (q->>'isRight')::boolean = true)::numeric / NULLIF((SELECT COUNT(*) FROM jsonb_array_elements(rv.question_list::jsonb))::numeric, 0) * 100 AS right_rate, ROW_NUMBER() OVER (PARTITION BY rv.user_id, rv.chapter_id ORDER BY rv.id) AS rn FROM bi_user_unit_review_question_result rv JOIN qualified_users qu ON rv.user_id = qu.user_id WHERE rv.chapter_id IN ({all_ids}) AND rv.deleted_at IS NULL AND rv.play_time <= {outlier_ms} ) SELECT level, chapter_id, COUNT(DISTINCT user_id), ROUND(AVG(play_time / 1000.0 / 60)::numeric, 1), ROUND(AVG(right_rate)::numeric, 1) FROM review_with_rate WHERE rn = 1 GROUP BY level, chapter_id ORDER BY level, chapter_id """, pw) chapter_map = { "343": "U0-L01", "344": "U0-L02", "345": "U0-L03", "346": "U0-L04", "348": "U0-L05", "55": "U0-L01", "56": "U0-L02", "57": "U0-L03", "58": "U0-L04", "59": "U0-L05" } result["review"] = [] for r in rows: result["review"].append({ "course": "L1" if r[0] == "A1" else "L2", "lesson": chapter_map.get(r[1], r[1]), "review_count": int(r[2]), "avg_duration_min": float(r[3]), "avg_right_rate_pct": float(r[4]) }) # 3. Summary (enhancement) data print("Querying summary data...") rows = run_pg("vala_bi", f""" {base_cte}, summary_data AS ( SELECT s.level, s.user_id, COUNT(DISTINCT s.km_type) AS km_types_done FROM bi_user_unit_summary_km_result s JOIN qualified_users qu ON s.user_id = qu.user_id WHERE s.story_id IN (65, 8) AND s.deleted_at IS NULL GROUP BY s.level, s.user_id ) SELECT level, COUNT(DISTINCT user_id), COUNT(DISTINCT CASE WHEN (level = 'A1' AND km_types_done >= 3) OR (level = 'A2' AND km_types_done >= 4) THEN user_id END), COUNT(DISTINCT CASE WHEN km_types_done = 1 THEN user_id END), COUNT(DISTINCT CASE WHEN km_types_done = 2 THEN user_id END), COUNT(DISTINCT CASE WHEN km_types_done = 3 THEN user_id END), COUNT(DISTINCT CASE WHEN km_types_done = 4 THEN user_id END) FROM summary_data GROUP BY level ORDER BY level """, pw) result["summary"] = [] for r in rows: result["summary"].append({ "course": "L1" if r[0] == "A1" else "L2", "total_km": 3 if r[0] == "A1" else 4, "enter_count": int(r[1]), "all_done": int(r[2]), "done_1": int(r[3]), "done_2": int(r[4]), "done_3": int(r[5]), "done_4": int(r[6]) }) # 4. Challenge data print("Querying challenge data...") rows = run_pg("vala_bi", f""" {base_cte}, challenge_first AS ( SELECT ch.level, ch.category, ch.score_text, ch.user_id, ROW_NUMBER() OVER (PARTITION BY ch.user_id, ch.level, ch.category ORDER BY ch.id) AS rn FROM bi_user_unit_challenge_question_result ch JOIN qualified_users qu ON ch.user_id = qu.user_id WHERE ch.story_id IN (65, 8) AND ch.deleted_at IS NULL ) SELECT level, category, COUNT(DISTINCT user_id), COUNT(DISTINCT CASE WHEN score_text = 'Perfect' THEN user_id END), COUNT(DISTINCT CASE WHEN score_text = 'Good' THEN user_id END), COUNT(DISTINCT CASE WHEN score_text = 'Oops' THEN user_id END) FROM challenge_first WHERE rn = 1 GROUP BY level, category ORDER BY level, category """, pw) result["challenge"] = [] for r in rows: total = int(r[3]) + int(r[4]) + int(r[5]) result["challenge"].append({ "course": "L1" if r[0] == "A1" else "L2", "category": r[1], "enter_count": int(r[2]), "perfect": int(r[3]), "good": int(r[4]), "oops": int(r[5]), "perfect_pct": round(int(r[3]) / total * 100) if total else 0, "good_pct": round(int(r[4]) / total * 100) if total else 0, "oops_pct": round(int(r[5]) / total * 100) if total else 0, }) # 5. Outlier records print("Querying outliers...") rows = run_pg("vala_bi", f""" {base_cte} SELECT rv.level, rv.chapter_id, rv.user_id, ROUND((rv.play_time / 1000.0 / 60)::numeric, 1), rv.play_time, rv.created_at FROM bi_user_unit_review_question_result rv JOIN qualified_users qu ON rv.user_id = qu.user_id WHERE rv.chapter_id IN ({all_ids}) AND rv.deleted_at IS NULL AND rv.play_time > {outlier_ms} ORDER BY rv.play_time DESC """, pw) result["outliers"] = [] for r in rows: result["outliers"].append({ "course": "L1" if r[0] == "A1" else "L2", "lesson": chapter_map.get(r[1], r[1]), "user_id": int(r[2]), "duration_min": float(r[3]), "play_time_ms": int(r[4]), "created_at": r[5] }) with open(args.output, "w") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"Done. Output: {args.output}") if __name__ == "__main__": main()