152 lines
4.4 KiB
Python
152 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
||
"""分析 homepage_challenge_click 入口点击用户数,按 A1/A2 拆分"""
|
||
|
||
import json
|
||
import urllib.request
|
||
import base64
|
||
import ssl
|
||
import psycopg2
|
||
|
||
ES_HOST = "es-7vd7jcu9.public.tencentelasticsearch.com"
|
||
ES_PORT = 9200
|
||
ES_USER = "elastic"
|
||
ES_PASS = "F%?QDcWes7N2WTuiYD11"
|
||
|
||
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
|
||
PG_PORT = 28591
|
||
PG_USER = "ai_member"
|
||
PG_PASS = "LdfjdjL83h3h3^$&**YGG*"
|
||
PG_DB = "vala_bi"
|
||
|
||
START_TS = 1747939200 # 2026-05-23 00:00:00 CST
|
||
END_TS = 1750521599 # 2026-06-21 23:59:59 CST
|
||
|
||
ctx = ssl.create_default_context()
|
||
ctx.check_hostname = False
|
||
ctx.verify_mode = ssl.CERT_NONE
|
||
|
||
def es_query(body):
|
||
url = f"https://{ES_HOST}:{ES_PORT}/user_behavior_buried_points/_search"
|
||
auth = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode()
|
||
req = urllib.request.Request(url, data=json.dumps(body).encode(), headers={
|
||
"Content-Type": "application/json",
|
||
"Authorization": f"Basic {auth}"
|
||
})
|
||
resp = urllib.request.urlopen(req, context=ctx)
|
||
return json.loads(resp.read())
|
||
|
||
# Step 1: 获取所有点击单元挑战入口的 accountId
|
||
print("Step 1: 从 ES 获取所有 accountId...")
|
||
all_account_ids = set()
|
||
after_key = None
|
||
page = 0
|
||
while True:
|
||
page += 1
|
||
body = {
|
||
"size": 0,
|
||
"query": {
|
||
"bool": {
|
||
"must": [
|
||
{"term": {"buryingPointId": 500}},
|
||
{"term": {"buryingPointSubId": 14}},
|
||
{"range": {"activeTime": {"gte": START_TS, "lte": END_TS}}}
|
||
]
|
||
}
|
||
},
|
||
"aggs": {
|
||
"users": {
|
||
"composite": {
|
||
"size": 10000,
|
||
"sources": [{"accountId": {"terms": {"field": "accountId"}}}]
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if after_key:
|
||
body["aggs"]["users"]["composite"]["after"] = after_key
|
||
|
||
result = es_query(body)
|
||
buckets = result.get("aggregations", {}).get("users", {}).get("buckets", [])
|
||
for b in buckets:
|
||
aid = b["key"]["accountId"]
|
||
if aid > 0:
|
||
all_account_ids.add(aid)
|
||
|
||
after_key = result.get("aggregations", {}).get("users", {}).get("after_key")
|
||
if not after_key or not buckets:
|
||
break
|
||
if page % 10 == 0:
|
||
print(f" page {page}, collected {len(all_account_ids)} users so far...")
|
||
|
||
print(f" 共 {len(all_account_ids)} 个唯一 accountId(已排除 accountId=0)")
|
||
|
||
if not all_account_ids:
|
||
print(" 没有数据,退出")
|
||
exit(0)
|
||
|
||
# Step 2: 从 PG 获取这些用户的 course_level
|
||
print("\nStep 2: 从 PostgreSQL 获取用户课程等级...")
|
||
conn = psycopg2.connect(host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASS, dbname=PG_DB)
|
||
cur = conn.cursor()
|
||
|
||
# 分批查询
|
||
batch_size = 500
|
||
account_list = list(all_account_ids)
|
||
user_levels = {} # accountId -> set of course_levels
|
||
|
||
for i in range(0, len(account_list), batch_size):
|
||
batch = account_list[i:i+batch_size]
|
||
placeholders = ",".join(["%s"] * len(batch))
|
||
cur.execute(f"""
|
||
SELECT DISTINCT account_id, course_level
|
||
FROM bi_user_course_detail
|
||
WHERE account_id IN ({placeholders})
|
||
AND deleted_at IS NULL
|
||
""", batch)
|
||
for row in cur.fetchall():
|
||
aid = row[0]
|
||
level = row[1]
|
||
if aid not in user_levels:
|
||
user_levels[aid] = set()
|
||
user_levels[aid].add(level)
|
||
|
||
cur.close()
|
||
conn.close()
|
||
|
||
# Step 3: 统计
|
||
print("\nStep 3: 统计结果...")
|
||
a1_users = set()
|
||
a2_users = set()
|
||
both_users = set()
|
||
unknown_users = set()
|
||
|
||
for aid in all_account_ids:
|
||
levels = user_levels.get(aid, set())
|
||
is_a1 = "L1" in levels or "A1" in levels
|
||
is_a2 = "L2" in levels or "A2" in levels
|
||
|
||
if is_a1 and is_a2:
|
||
both_users.add(aid)
|
||
elif is_a1:
|
||
a1_users.add(aid)
|
||
elif is_a2:
|
||
a2_users.add(aid)
|
||
else:
|
||
unknown_users.add(aid)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"单元挑战入口点击用户数统计")
|
||
print(f"统计周期: 2026-05-23 ~ 2026-06-21")
|
||
print(f"{'='*50}")
|
||
print(f"A1 (L1) 用户数: {len(a1_users)}")
|
||
print(f"A2 (L2) 用户数: {len(a2_users)}")
|
||
print(f"同时有 A1+A2 课程: {len(both_users)}")
|
||
print(f"未匹配到课程等级: {len(unknown_users)}")
|
||
print(f"{'='*50}")
|
||
print(f"总用户数: {len(all_account_ids)}")
|
||
print(f"A1 合计 (含双课程): {len(a1_users) + len(both_users)}")
|
||
print(f"A2 合计 (含双课程): {len(a2_users) + len(both_users)}")
|
||
|
||
if unknown_users:
|
||
print(f"\n未匹配用户 accountId: {sorted(unknown_users)[:20]}...")
|