ai_member_xiaoxi/scripts/challenge_click_analysis.py
2026-06-24 08:00:01 +08:00

152 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""分析 homepage_challenge_click 入口点击用户数,按 A1/A2 拆分"""
import json
import urllib.request
import base64
import ssl
import psycopg2
ES_HOST = "es-7vd7jcu9.public.tencentelasticsearch.com"
ES_PORT = 9200
ES_USER = "elastic"
ES_PASS = "F%?QDcWes7N2WTuiYD11"
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = 28591
PG_USER = "ai_member"
PG_PASS = "LdfjdjL83h3h3^$&**YGG*"
PG_DB = "vala_bi"
START_TS = 1747939200 # 2026-05-23 00:00:00 CST
END_TS = 1750521599 # 2026-06-21 23:59:59 CST
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
def es_query(body):
url = f"https://{ES_HOST}:{ES_PORT}/user_behavior_buried_points/_search"
auth = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode()
req = urllib.request.Request(url, data=json.dumps(body).encode(), headers={
"Content-Type": "application/json",
"Authorization": f"Basic {auth}"
})
resp = urllib.request.urlopen(req, context=ctx)
return json.loads(resp.read())
# Step 1: 获取所有点击单元挑战入口的 accountId
print("Step 1: 从 ES 获取所有 accountId...")
all_account_ids = set()
after_key = None
page = 0
while True:
page += 1
body = {
"size": 0,
"query": {
"bool": {
"must": [
{"term": {"buryingPointId": 500}},
{"term": {"buryingPointSubId": 14}},
{"range": {"activeTime": {"gte": START_TS, "lte": END_TS}}}
]
}
},
"aggs": {
"users": {
"composite": {
"size": 10000,
"sources": [{"accountId": {"terms": {"field": "accountId"}}}]
}
}
}
}
if after_key:
body["aggs"]["users"]["composite"]["after"] = after_key
result = es_query(body)
buckets = result.get("aggregations", {}).get("users", {}).get("buckets", [])
for b in buckets:
aid = b["key"]["accountId"]
if aid > 0:
all_account_ids.add(aid)
after_key = result.get("aggregations", {}).get("users", {}).get("after_key")
if not after_key or not buckets:
break
if page % 10 == 0:
print(f" page {page}, collected {len(all_account_ids)} users so far...")
print(f"{len(all_account_ids)} 个唯一 accountId已排除 accountId=0")
if not all_account_ids:
print(" 没有数据,退出")
exit(0)
# Step 2: 从 PG 获取这些用户的 course_level
print("\nStep 2: 从 PostgreSQL 获取用户课程等级...")
conn = psycopg2.connect(host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASS, dbname=PG_DB)
cur = conn.cursor()
# 分批查询
batch_size = 500
account_list = list(all_account_ids)
user_levels = {} # accountId -> set of course_levels
for i in range(0, len(account_list), batch_size):
batch = account_list[i:i+batch_size]
placeholders = ",".join(["%s"] * len(batch))
cur.execute(f"""
SELECT DISTINCT account_id, course_level
FROM bi_user_course_detail
WHERE account_id IN ({placeholders})
AND deleted_at IS NULL
""", batch)
for row in cur.fetchall():
aid = row[0]
level = row[1]
if aid not in user_levels:
user_levels[aid] = set()
user_levels[aid].add(level)
cur.close()
conn.close()
# Step 3: 统计
print("\nStep 3: 统计结果...")
a1_users = set()
a2_users = set()
both_users = set()
unknown_users = set()
for aid in all_account_ids:
levels = user_levels.get(aid, set())
is_a1 = "L1" in levels or "A1" in levels
is_a2 = "L2" in levels or "A2" in levels
if is_a1 and is_a2:
both_users.add(aid)
elif is_a1:
a1_users.add(aid)
elif is_a2:
a2_users.add(aid)
else:
unknown_users.add(aid)
print(f"\n{'='*50}")
print(f"单元挑战入口点击用户数统计")
print(f"统计周期: 2026-05-23 ~ 2026-06-21")
print(f"{'='*50}")
print(f"A1 (L1) 用户数: {len(a1_users)}")
print(f"A2 (L2) 用户数: {len(a2_users)}")
print(f"同时有 A1+A2 课程: {len(both_users)}")
print(f"未匹配到课程等级: {len(unknown_users)}")
print(f"{'='*50}")
print(f"总用户数: {len(all_account_ids)}")
print(f"A1 合计 (含双课程): {len(a1_users) + len(both_users)}")
print(f"A2 合计 (含双课程): {len(a2_users) + len(both_users)}")
if unknown_users:
print(f"\n未匹配用户 accountId: {sorted(unknown_users)[:20]}...")