#!/usr/bin/env python3 """分析 homepage_challenge_click 入口点击用户数,按 A1/A2 拆分""" import json import urllib.request import base64 import ssl import psycopg2 ES_HOST = "es-7vd7jcu9.public.tencentelasticsearch.com" ES_PORT = 9200 ES_USER = "elastic" ES_PASS = "F%?QDcWes7N2WTuiYD11" PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com" PG_PORT = 28591 PG_USER = "ai_member" PG_PASS = "LdfjdjL83h3h3^$&**YGG*" PG_DB = "vala_bi" START_TS = 1747939200 # 2026-05-23 00:00:00 CST END_TS = 1750521599 # 2026-06-21 23:59:59 CST ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE def es_query(body): url = f"https://{ES_HOST}:{ES_PORT}/user_behavior_buried_points/_search" auth = base64.b64encode(f"{ES_USER}:{ES_PASS}".encode()).decode() req = urllib.request.Request(url, data=json.dumps(body).encode(), headers={ "Content-Type": "application/json", "Authorization": f"Basic {auth}" }) resp = urllib.request.urlopen(req, context=ctx) return json.loads(resp.read()) # Step 1: 获取所有点击单元挑战入口的 accountId print("Step 1: 从 ES 获取所有 accountId...") all_account_ids = set() after_key = None page = 0 while True: page += 1 body = { "size": 0, "query": { "bool": { "must": [ {"term": {"buryingPointId": 500}}, {"term": {"buryingPointSubId": 14}}, {"range": {"activeTime": {"gte": START_TS, "lte": END_TS}}} ] } }, "aggs": { "users": { "composite": { "size": 10000, "sources": [{"accountId": {"terms": {"field": "accountId"}}}] } } } } if after_key: body["aggs"]["users"]["composite"]["after"] = after_key result = es_query(body) buckets = result.get("aggregations", {}).get("users", {}).get("buckets", []) for b in buckets: aid = b["key"]["accountId"] if aid > 0: all_account_ids.add(aid) after_key = result.get("aggregations", {}).get("users", {}).get("after_key") if not after_key or not buckets: break if page % 10 == 0: print(f" page {page}, collected {len(all_account_ids)} users so far...") print(f" 共 {len(all_account_ids)} 个唯一 accountId(已排除 accountId=0)") if not all_account_ids: print(" 没有数据,退出") exit(0) # Step 2: 从 PG 获取这些用户的 course_level print("\nStep 2: 从 PostgreSQL 获取用户课程等级...") conn = psycopg2.connect(host=PG_HOST, port=PG_PORT, user=PG_USER, password=PG_PASS, dbname=PG_DB) cur = conn.cursor() # 分批查询 batch_size = 500 account_list = list(all_account_ids) user_levels = {} # accountId -> set of course_levels for i in range(0, len(account_list), batch_size): batch = account_list[i:i+batch_size] placeholders = ",".join(["%s"] * len(batch)) cur.execute(f""" SELECT DISTINCT account_id, course_level FROM bi_user_course_detail WHERE account_id IN ({placeholders}) AND deleted_at IS NULL """, batch) for row in cur.fetchall(): aid = row[0] level = row[1] if aid not in user_levels: user_levels[aid] = set() user_levels[aid].add(level) cur.close() conn.close() # Step 3: 统计 print("\nStep 3: 统计结果...") a1_users = set() a2_users = set() both_users = set() unknown_users = set() for aid in all_account_ids: levels = user_levels.get(aid, set()) is_a1 = "L1" in levels or "A1" in levels is_a2 = "L2" in levels or "A2" in levels if is_a1 and is_a2: both_users.add(aid) elif is_a1: a1_users.add(aid) elif is_a2: a2_users.add(aid) else: unknown_users.add(aid) print(f"\n{'='*50}") print(f"单元挑战入口点击用户数统计") print(f"统计周期: 2026-05-23 ~ 2026-06-21") print(f"{'='*50}") print(f"A1 (L1) 用户数: {len(a1_users)}") print(f"A2 (L2) 用户数: {len(a2_users)}") print(f"同时有 A1+A2 课程: {len(both_users)}") print(f"未匹配到课程等级: {len(unknown_users)}") print(f"{'='*50}") print(f"总用户数: {len(all_account_ids)}") print(f"A1 合计 (含双课程): {len(a1_users) + len(both_users)}") print(f"A2 合计 (含双课程): {len(a2_users) + len(both_users)}") if unknown_users: print(f"\n未匹配用户 accountId: {sorted(unknown_users)[:20]}...")