ai_member_xiaoxi/scripts/auto_xingke_query_v2.py
2026-06-02 08:00:01 +08:00

503 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
行课查询自动回填 V2 — 适配销转客户主表(全量)
数据源: ERCFsFo4MhnF0ytGeCrc0Bb8n5f / 1RFMqc (全量)
输出: 小溪明细 sheet (1zB5Be),全量表通过 VLOOKUP 自动刷新
流程:
1. 读取全量表 → 有手机无UID的 → 匹配 account_id → 回填 X/Y/Z 列
2. 读取全量表 → 所有有UID的 → 查行课数据 → 写入小溪明细
"""
import json, requests, os, re, sys, time, psycopg2
from datetime import datetime
from collections import defaultdict
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
# ── 配置 ──
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = 28591
PG_USER = "ai_member"
PG_DB = "vala_bi"
SPREADSHEET_TOKEN = "ERCFsFo4MhnF0ytGeCrc0Bb8n5f"
SHEET_ALL = "1RFMqc" # 全量表
SHEET_DETAIL = "1zB5Be" # 小溪明细
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
LOG_FILE = "/var/log/xiaoxi_xingke_query_v2.log"
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_pg_password():
secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env")
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip("'\"")
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_sheet(token, sheet_id, range_str=None):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
if range_str:
url += f"!{range_str}"
for attempt in range(3):
try:
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=90)
resp.encoding = 'utf-8'
data = json.loads(resp.text)
if data.get("code") != 0:
raise RuntimeError(f"读取Sheet失败: {data}")
return data["data"]["valueRange"]["values"]
except (json.JSONDecodeError, requests.exceptions.ConnectionError) as e:
log(f" 读取重试 {attempt+1}/3: {e}")
if attempt == 2:
log(f" resp.text[:100]: {resp.text[:100] if 'resp' in dir() else 'N/A'}")
time.sleep(2 ** attempt)
raise RuntimeError("读取Sheet失败: 3次重试均失败")
def put_values(token, sheet_id, range_str, values):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
resp = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
r = resp.json()
if r.get("code") != 0:
log(f" 写入失败 {range_str}: {r}")
return r
def append_rows(token, sheet_id, values):
"""追加行到 sheet 末尾"""
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}:append"
body = {"valueRange": {"range": f"{sheet_id}!A:AA", "values": values}}
resp = requests.post(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
r = resp.json()
if r.get("code") != 0:
log(f" 追加失败: {r}")
return r
def encrypt_phone_local(phone):
phone = str(phone).strip()
if "." in phone:
parts = phone.split(".")
if parts[1] in ("0", "00"):
phone = parts[0]
if re.match(r"^1\d{10}$", phone):
return encrypt_phone(phone)
return None
def mask_phone_display(phone):
"""手机号脱敏用于展示: 130****1234"""
phone = str(phone).strip()
if re.match(r"^1\d{10}$", phone):
return f"{phone[:3]}****{phone[-4:]}"
return phone
def batch_in(cur, sql_tpl, params, chunk=500):
results = []
for i in range(0, len(params), chunk):
batch = params[i:i+chunk]
ph = ",".join(["%s"] * len(batch))
cur.execute(sql_tpl % ph, batch)
results.extend(cur.fetchall())
return results
# ── Phase 1: 手机号→ID 匹配 ──
def phase1_phone_to_id(token, conn):
"""读取全量表对有手机无UID的行做匹配回填 X/Y/Z 列"""
log("Phase 1: 手机号→ID 匹配")
rows = read_sheet(token, SHEET_ALL)
pending = []
for idx, row in enumerate(rows[3:], start=4):
if len(row) < 5:
continue
# D列=手机号(索引3), I列=用户ID(索引8), X列=匹配uid(索引23)
phone = str(row[3]).strip() if len(row) > 3 and row[3] else ""
uid_i = str(row[8]).strip() if len(row) > 8 and row[8] else ""
uid_x = str(row[23]).strip() if len(row) > 23 and row[23] else ""
if not phone or uid_i or uid_x:
continue
if phone.startswith("="):
continue
pending.append({"row_idx": idx, "phone": phone})
log(f" 待匹配: {len(pending)}")
if not pending:
log(" 无需匹配, 跳过")
return 0, 0
phones_raw = list(set(r["phone"] for r in pending))
valid_phones = [(p, encrypt_phone_local(p)) for p in phones_raw if encrypt_phone_local(p)]
enc_list = list(set(m[1] for m in valid_phones))
cur = conn.cursor()
enc_to_aid = {}
for i in range(0, len(enc_list), 500):
batch = enc_list[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
batch
)
for aid, tel_enc in cur.fetchall():
if tel_enc not in enc_to_aid:
enc_to_aid[tel_enc] = aid
phone_to_aid = {}
for phone, enc in valid_phones:
if enc in enc_to_aid:
phone_to_aid[phone] = enc_to_aid[enc]
matched = len(phone_to_aid)
log(f" 匹配成功: {matched}, 未匹配: {len(valid_phones)-matched}")
for r in pending:
phone = r["phone"]
row = r["row_idx"]
if phone in phone_to_aid:
put_values(token, SHEET_ALL, f"X{row}:X{row}", [[str(phone_to_aid[phone])]])
log(f" 回填完成: {len(pending)}")
cur.close()
return len(pending), matched
# ── Phase 2: 行课记录查询 & 写入小溪明细 ──
def phase2_course_records(token, conn):
"""读取全量表所有UID → 查行课数据 → 写入小溪明细"""
log("Phase 2: 行课记录查询")
rows = read_sheet(token, SHEET_ALL)
uid_rows = {}
for idx, row in enumerate(rows[3:], start=4):
uid = ""
if len(row) > 8 and row[8]:
try:
uid = str(int(float(str(row[8]).strip())))
except (ValueError, TypeError):
pass
if not uid and len(row) > 23 and row[23]:
try:
uid = str(int(float(str(row[23]).strip())))
except (ValueError, TypeError):
pass
if not uid:
continue
sales = str(row[0]).strip() if len(row) > 0 and row[0] else ""
jinxian = str(row[2]).strip() if len(row) > 2 and row[2] else ""
phone = str(row[3]).strip() if len(row) > 3 and row[3] else ""
aid = int(uid)
if aid <= 0:
continue
if aid not in uid_rows:
uid_rows[aid] = (idx, sales, jinxian, phone)
uid_set = list(uid_rows.keys())
log(f" 有效UID: {len(uid_set)}")
if not uid_set:
log(" 无UID, 跳过")
return 0
cur = conn.cursor()
# 账户信息
log(" 查询账户信息...")
aid_info = {}
for i in range(0, len(uid_set), 500):
batch = uid_set[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT id, tel, created_at FROM bi_vala_app_account WHERE id IN ({ph}) AND status=1 AND deleted_at IS NULL",
batch
)
for aid, tel, created_at in cur.fetchall():
aid_info[aid] = {"tel": tel or "", "created_at": str(created_at) if created_at else ""}
# 角色信息
log(" 查询角色信息...")
account_chars = defaultdict(list)
char_to_account = {}
rc = batch_in(cur,
"SELECT account_id, id, nickname FROM bi_vala_app_character WHERE account_id IN (%s) AND nickname IS NOT NULL AND nickname != '' AND deleted_at IS NULL",
uid_set
)
for aid, cid, nick in rc:
account_chars[aid].append(cid)
char_to_account[cid] = aid
char_ids = list(char_to_account.keys())
log(f" 角色数: {len(char_ids)}")
# 课程映射
cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson")
chapter_map = {}
for ch_id, cl, cs, cu, cl2 in cur.fetchall():
chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "")
# 课时完成记录
log(" 查询课时完成记录...")
char_chapter_times = defaultdict(dict)
char_latest = {}
for tbl_idx in range(8):
table = f"bi_user_chapter_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)",
(char_ids,)
)
for uid, ch_id, created_at in cur.fetchall():
ch_data = chapter_map.get(ch_id)
if not ch_data:
continue
if ch_id not in char_chapter_times[uid] or created_at < char_chapter_times[uid][ch_id]:
char_chapter_times[uid][ch_id] = created_at
prev = char_latest.get(uid)
if prev is None or created_at > prev[0]:
char_latest[uid] = (created_at, ch_id, ch_data)
except Exception as e:
log(f" 警告 {table}: {e}")
# 学习耗时
log(" 查询学习耗时...")
char_total_ms = defaultdict(int)
for tbl_idx in range(8):
table = f"bi_user_component_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id",
(char_ids,)
)
for uid, total_ms in cur.fetchall():
char_total_ms[uid] += (total_ms or 0)
except Exception as e:
log(f" 警告 {table}: {e}")
# 激活状态
log(" 查询激活状态...")
activation = {}
for i in range(0, len(uid_set), 500):
batch = uid_set[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket WHERE account_id IN ({ph}) AND status=1 AND deleted_at IS NULL AND season_package_level IN ('A1','A2')",
batch
)
for aid, lvl in cur.fetchall():
if aid not in activation:
activation[aid] = lvl
# 付费信息
log(" 查询付费信息...")
paid_info = {}
for i in range(0, len(uid_set), 500):
batch = uid_set[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur.execute(
f"""SELECT account_id,
MIN(pay_success_date) as first_pay,
SUM(pay_amount_int)/100.0 as total_gmv,
SUM(CASE WHEN order_status=3 THEN pay_amount_int ELSE 0 END)/100.0 as total_gsv,
STRING_AGG(DISTINCT key_from, ', ') as channels
FROM bi_vala_order
WHERE account_id IN ({ph})
AND pay_success_date IS NOT NULL
AND order_status IN (3,4)
AND deleted_at IS NULL
GROUP BY account_id""",
batch
)
for aid, first_pay, gmv, gsv, channels in cur.fetchall():
paid_info[aid] = (str(first_pay) if first_pay else "", gmv or 0, gsv or 0, channels or "")
# 最近登录
log(" 查询最近登录...")
last_login = {}
for i in range(0, len(uid_set), 500):
batch = uid_set[i:i+500]
ph = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT account_id, MAX(login_date) FROM bi_vala_app_account WHERE id IN ({ph}) AND status=1 AND deleted_at IS NULL GROUP BY account_id",
batch
)
for aid, dt in cur.fetchall():
if dt:
last_login[aid] = str(dt)
cur.close()
# 组装小溪明细
log(" 组装小溪明细...")
detail_rows = []
for aid in uid_set:
row_idx, sales, jinxian, phone_raw = uid_rows[aid]
info = aid_info.get(aid, {})
tel = info.get("tel", "")
created_at = info.get("created_at", "")
masked_tel = mask_phone_display(tel) or mask_phone_display(phone_raw) or ""
masked_phone = mask_phone_display(phone_raw) or ""
chars = account_chars.get(aid, [])
exp_lessons = {}
first_lesson_time = None
first_lesson_ch = None
best_latest_time = None
best_latest_ch = None
total_min = 0.0
for cid in chars:
ctimes = char_chapter_times.get(cid, {})
for ch_id, ct in ctimes.items():
ch_info = chapter_map.get(ch_id)
if not ch_info:
continue
cl, cs, cu, cl2 = ch_info
if cs == "S0" and cu == "U00":
if cl2 not in exp_lessons or ct < exp_lessons[cl2]:
exp_lessons[cl2] = ct
if first_lesson_time is None or ct < first_lesson_time:
first_lesson_time = ct
first_lesson_ch = ch_info
lt = char_latest.get(cid)
if lt and (best_latest_time is None or lt[0] > best_latest_time):
best_latest_time = lt[0]
best_latest_ch = lt[1]
total_min += char_total_ms.get(cid, 0) / 60000.0
total_min = round(total_min, 1)
if total_min == int(total_min):
total_min = int(total_min)
exp_count = len(exp_lessons)
first_time_str = first_lesson_time.strftime("%Y-%m-%d %H:%M:%S") if first_lesson_time else ""
first_ch_str = f"{first_lesson_ch[0]}-{first_lesson_ch[1]}-{first_lesson_ch[2]}-{first_lesson_ch[3]}" if first_lesson_ch else ""
latest_time_str = best_latest_time.strftime("%Y-%m-%d %H:%M:%S") if best_latest_time else ""
latest_ch_str = f"{chapter_map[best_latest_ch][0]}-{chapter_map[best_latest_ch][1]}-{chapter_map[best_latest_ch][2]}-{chapter_map[best_latest_ch][3]}" if best_latest_ch and best_latest_ch in chapter_map else ""
act_level = activation.get(aid, "")
if act_level:
course_level = act_level
course_type = "正式课" if aid in paid_info else "体验课"
elif first_lesson_ch:
course_level = first_lesson_ch[0]
course_type = "体验课"
else:
course_level = ""
course_type = ""
nicknames = []
for cid in chars:
for orig_aid, orig_cid, orig_nick in rc:
if orig_cid == cid:
nicknames.append(orig_nick)
nickname_str = " / ".join(nicknames[:3])
pi = paid_info.get(aid)
paid_status = "已付费" if pi else "未付费"
first_pay_time = pi[0] if pi else ""
total_gmv = pi[1] if pi else 0
total_gsv = pi[2] if pi else 0
channels = pi[3] if pi else ""
row_data = [
str(aid), masked_tel, masked_phone, sales, jinxian, created_at, "",
nickname_str, course_level, course_type,
first_time_str, first_ch_str,
exp_lessons.get("L01").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L01") else "",
exp_lessons.get("L02").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L02") else "",
exp_lessons.get("L03").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L03") else "",
exp_lessons.get("L04").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L04") else "",
exp_lessons.get("L05").strftime("%Y-%m-%d %H:%M:%S") if exp_lessons.get("L05") else "",
str(exp_count),
latest_time_str, latest_ch_str, str(total_min),
paid_status, first_pay_time,
str(total_gmv), str(total_gsv), channels,
last_login.get(aid, ""),
]
detail_rows.append((aid, row_data))
# 写入小溪明细
log(" 写入小溪明细...")
existing = read_sheet(token, SHEET_DETAIL)
existing_map = {}
for i, row in enumerate(existing[1:], start=2):
if row and row[0]:
try:
existing_map[int(float(str(row[0]).strip()))] = i
except (ValueError, TypeError):
pass
update_count = 0
new_rows_list = []
for aid, row_data in detail_rows:
if aid in existing_map:
ri = existing_map[aid]
put_values(token, SHEET_DETAIL, f"A{ri}:AA{ri}", [row_data])
update_count += 1
else:
new_rows_list.append(row_data)
log(f" 更新: {update_count}, 新增: {len(new_rows_list)}")
# 追加新行
for i in range(0, len(new_rows_list), 500):
batch = new_rows_list[i:i+500]
append_rows(token, SHEET_DETAIL, batch)
log(f" 行课明细完成: 共 {len(detail_rows)}")
return len(detail_rows)
# ── Main ──
def main():
log("=" * 50)
log("行课查询 V2 启动 (销转客户主表)")
try:
token = get_fs_token()
conn = psycopg2.connect(
host=PG_HOST, port=PG_PORT, user=PG_USER,
password=get_pg_password(), dbname=PG_DB, connect_timeout=30
)
p1_total, p1_matched = phase1_phone_to_id(token, conn)
p2_total = phase2_course_records(token, conn)
conn.close()
summary = f"Phase1(ID匹配): {p1_total} 行(匹配{p1_matched}) | Phase2(行课): {p2_total}"
log(f"完成: {summary}")
return 0
except Exception as e:
log(f"ERROR: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())