refund-user-learning-analys.../scripts/query_refund_learning.py

237 lines
9.5 KiB
Python

#!/usr/bin/env python3
"""
退费用户学习数据查询脚本
用法: python3 query_refund_learning.py --start 2026-04-01 --end 2026-05-01 --output /tmp/report.json
参数:
--start 订单付款起始日期 (YYYY-MM-DD)
--end 订单付款截止日期 (YYYY-MM-DD)
--output JSON 结果输出路径
--pure 是否剔除仍有有效订单的用户 (默认 true)
--outlier 巩固用时异常阈值(分钟), 超过此值视为脏数据 (默认 60)
"""
import argparse, json, os, subprocess, sys
def get_pg_password():
secrets_path = os.path.expanduser("~/.openclaw/workspace/secrets.env")
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.split("'")[1]
raise RuntimeError("PG_ONLINE_PASSWORD not found in secrets.env")
def run_pg(db, sql, password):
env = os.environ.copy()
env["PGPASSWORD"] = password
r = subprocess.run(
["psql", "-h", "bj-postgres-16pob4sg.sql.tencentcdb.com", "-p", "28591",
"-U", "ai_member", "-d", db, "-t", "-A", "-F", "\t", "-c", sql],
capture_output=True, text=True, env=env, timeout=120
)
if r.returncode != 0:
print(f"SQL ERROR: {r.stderr}", file=sys.stderr)
sys.exit(1)
rows = [line.split("\t") for line in r.stdout.strip().split("\n") if line.strip()]
return rows
def main():
p = argparse.ArgumentParser()
p.add_argument("--start", required=True)
p.add_argument("--end", required=True)
p.add_argument("--output", default="/tmp/refund_learning_report.json")
p.add_argument("--pure", default="true")
p.add_argument("--outlier", type=float, default=60.0)
args = p.parse_args()
pw = get_pg_password()
pure_clause = ""
if args.pure == "true":
pure_clause = "WHERE NOT EXISTS (SELECT 1 FROM bi_vala_order o2 WHERE o2.account_id = ra.account_id AND o2.order_status = 3)"
# --- Chapter ID mappings ---
# L1-U0: 343,344,345,346,348 | L2-U0: 55,56,57,58,59
l1_ids = "343,344,345,346,348"
l2_ids = "55,56,57,58,59"
all_ids = f"{l1_ids},{l2_ids}"
chapter_play_union = " UNION ALL ".join([
f"SELECT r.user_id, r.chapter_id FROM bi_user_chapter_play_record_{i} r JOIN refund_users ru ON r.user_id = ru.user_id WHERE r.play_status = 1 AND r.chapter_id IN ({all_ids})"
for i in range(8)
])
base_cte = f"""
WITH refund_accounts AS (
SELECT DISTINCT o.account_id FROM bi_vala_order o
JOIN bi_vala_app_account a ON a.id = o.account_id AND a.status = 1
JOIN bi_refund_order r ON r.out_trade_no = o.out_trade_no AND r.status = 3
WHERE o.order_status = 4 AND o.pay_success_date >= '{args.start}' AND o.pay_success_date < '{args.end}'
),
pure_refund_accounts AS (
SELECT ra.account_id FROM refund_accounts ra {pure_clause}
),
refund_users AS (
SELECT c.id AS user_id, c.account_id FROM bi_vala_app_character c
JOIN pure_refund_accounts pra ON c.account_id = pra.account_id WHERE c.deleted_at IS NULL
),
all_done AS ({chapter_play_union}),
user_done_count AS (
SELECT user_id,
COUNT(DISTINCT CASE WHEN chapter_id IN ({l1_ids}) THEN chapter_id END) AS l1_done,
COUNT(DISTINCT CASE WHEN chapter_id IN ({l2_ids}) THEN chapter_id END) AS l2_done
FROM (SELECT DISTINCT user_id, chapter_id FROM all_done) t GROUP BY user_id
),
qualified_users AS (
SELECT ru.user_id, ru.account_id FROM user_done_count udc
JOIN refund_users ru ON udc.user_id = ru.user_id WHERE udc.l1_done = 5 OR udc.l2_done = 5
)"""
result = {}
# 1. Funnel counts
print("Querying funnel counts...")
rows = run_pg("vala_bi", f"""
{base_cte}
SELECT
(SELECT COUNT(*) FROM refund_accounts),
(SELECT COUNT(*) FROM pure_refund_accounts),
(SELECT COUNT(DISTINCT account_id) FROM qualified_users),
(SELECT COUNT(DISTINCT account_id) FROM qualified_users qu
JOIN user_done_count udc ON qu.user_id = udc.user_id AND udc.l1_done = 5 AND udc.l2_done < 5),
(SELECT COUNT(DISTINCT account_id) FROM qualified_users qu
JOIN user_done_count udc ON qu.user_id = udc.user_id AND udc.l2_done = 5 AND udc.l1_done < 5),
(SELECT COUNT(DISTINCT account_id) FROM qualified_users qu
JOIN user_done_count udc ON qu.user_id = udc.user_id AND udc.l1_done = 5 AND udc.l2_done = 5)
""", pw)
r = rows[0]
result["funnel"] = {
"total_refund": int(r[0]), "pure_refund": int(r[1]),
"completed_u0": int(r[2]), "l1_only": int(r[3]),
"l2_only": int(r[4]), "both": int(r[5])
}
# 2. Review data (with outlier filtering)
print("Querying review data...")
outlier_ms = int(args.outlier * 60 * 1000)
rows = run_pg("vala_bi", f"""
{base_cte},
review_with_rate AS (
SELECT rv.level, rv.chapter_id, rv.user_id, rv.play_time,
(SELECT COUNT(*) FROM jsonb_array_elements(rv.question_list::jsonb) q WHERE (q->>'isRight')::boolean = true)::numeric
/ NULLIF((SELECT COUNT(*) FROM jsonb_array_elements(rv.question_list::jsonb))::numeric, 0) * 100 AS right_rate,
ROW_NUMBER() OVER (PARTITION BY rv.user_id, rv.chapter_id ORDER BY rv.id) AS rn
FROM bi_user_unit_review_question_result rv
JOIN qualified_users qu ON rv.user_id = qu.user_id
WHERE rv.chapter_id IN ({all_ids}) AND rv.deleted_at IS NULL AND rv.play_time <= {outlier_ms}
)
SELECT level, chapter_id,
COUNT(DISTINCT user_id),
ROUND(AVG(play_time / 1000.0 / 60)::numeric, 1),
ROUND(AVG(right_rate)::numeric, 1)
FROM review_with_rate WHERE rn = 1
GROUP BY level, chapter_id ORDER BY level, chapter_id
""", pw)
chapter_map = {
"343": "U0-L01", "344": "U0-L02", "345": "U0-L03", "346": "U0-L04", "348": "U0-L05",
"55": "U0-L01", "56": "U0-L02", "57": "U0-L03", "58": "U0-L04", "59": "U0-L05"
}
result["review"] = []
for r in rows:
result["review"].append({
"course": "L1" if r[0] == "A1" else "L2",
"lesson": chapter_map.get(r[1], r[1]),
"review_count": int(r[2]),
"avg_duration_min": float(r[3]),
"avg_right_rate_pct": float(r[4])
})
# 3. Summary (enhancement) data
print("Querying summary data...")
rows = run_pg("vala_bi", f"""
{base_cte},
summary_data AS (
SELECT s.level, s.user_id, COUNT(DISTINCT s.km_type) AS km_types_done
FROM bi_user_unit_summary_km_result s
JOIN qualified_users qu ON s.user_id = qu.user_id
WHERE s.story_id IN (65, 8) AND s.deleted_at IS NULL
GROUP BY s.level, s.user_id
)
SELECT
level,
COUNT(DISTINCT user_id),
COUNT(DISTINCT CASE WHEN (level = 'A1' AND km_types_done >= 3) OR (level = 'A2' AND km_types_done >= 4) THEN user_id END),
COUNT(DISTINCT CASE WHEN km_types_done = 1 THEN user_id END),
COUNT(DISTINCT CASE WHEN km_types_done = 2 THEN user_id END),
COUNT(DISTINCT CASE WHEN km_types_done = 3 THEN user_id END),
COUNT(DISTINCT CASE WHEN km_types_done = 4 THEN user_id END)
FROM summary_data GROUP BY level ORDER BY level
""", pw)
result["summary"] = []
for r in rows:
result["summary"].append({
"course": "L1" if r[0] == "A1" else "L2",
"total_km": 3 if r[0] == "A1" else 4,
"enter_count": int(r[1]), "all_done": int(r[2]),
"done_1": int(r[3]), "done_2": int(r[4]),
"done_3": int(r[5]), "done_4": int(r[6])
})
# 4. Challenge data
print("Querying challenge data...")
rows = run_pg("vala_bi", f"""
{base_cte},
challenge_first AS (
SELECT ch.level, ch.category, ch.score_text, ch.user_id,
ROW_NUMBER() OVER (PARTITION BY ch.user_id, ch.level, ch.category ORDER BY ch.id) AS rn
FROM bi_user_unit_challenge_question_result ch
JOIN qualified_users qu ON ch.user_id = qu.user_id
WHERE ch.story_id IN (65, 8) AND ch.deleted_at IS NULL
)
SELECT level, category,
COUNT(DISTINCT user_id),
COUNT(DISTINCT CASE WHEN score_text = 'Perfect' THEN user_id END),
COUNT(DISTINCT CASE WHEN score_text = 'Good' THEN user_id END),
COUNT(DISTINCT CASE WHEN score_text = 'Oops' THEN user_id END)
FROM challenge_first WHERE rn = 1
GROUP BY level, category ORDER BY level, category
""", pw)
result["challenge"] = []
for r in rows:
total = int(r[3]) + int(r[4]) + int(r[5])
result["challenge"].append({
"course": "L1" if r[0] == "A1" else "L2",
"category": r[1],
"enter_count": int(r[2]),
"perfect": int(r[3]), "good": int(r[4]), "oops": int(r[5]),
"perfect_pct": round(int(r[3]) / total * 100) if total else 0,
"good_pct": round(int(r[4]) / total * 100) if total else 0,
"oops_pct": round(int(r[5]) / total * 100) if total else 0,
})
# 5. Outlier records
print("Querying outliers...")
rows = run_pg("vala_bi", f"""
{base_cte}
SELECT rv.level, rv.chapter_id, rv.user_id,
ROUND((rv.play_time / 1000.0 / 60)::numeric, 1), rv.play_time, rv.created_at
FROM bi_user_unit_review_question_result rv
JOIN qualified_users qu ON rv.user_id = qu.user_id
WHERE rv.chapter_id IN ({all_ids}) AND rv.deleted_at IS NULL AND rv.play_time > {outlier_ms}
ORDER BY rv.play_time DESC
""", pw)
result["outliers"] = []
for r in rows:
result["outliers"].append({
"course": "L1" if r[0] == "A1" else "L2",
"lesson": chapter_map.get(r[1], r[1]),
"user_id": int(r[2]),
"duration_min": float(r[3]),
"play_time_ms": int(r[4]),
"created_at": r[5]
})
with open(args.output, "w") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
print(f"Done. Output: {args.output}")
if __name__ == "__main__":
main()