361 lines
14 KiB
Python
361 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
销售表行课状态同步 — 从销售表读UID → 查DB行课 → 回填体验节数
|
||
执行频率:每30分钟 cron 巡检
|
||
归属 Agent:小溪 (xiaoxi)
|
||
|
||
流程:
|
||
1. 读取小龙(qJF4I)和吴迪(f975f0)销售表,提取有UID的行
|
||
2. 查DB获取每个用户完成的课时数(唯一chapter_id)
|
||
3. 回填销售表D列(体验节数)
|
||
|
||
过程数据 J/N/R/V/Z 由 COUNTIFS 公式自动读取销售表D列,无需脚本写入。
|
||
"""
|
||
import json, requests, os, sys, psycopg2
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
|
||
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
sys.path.insert(0, SCRIPTS_DIR)
|
||
from phone_encrypt import encrypt_phone
|
||
|
||
# ── 配置 ──
|
||
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
|
||
PG_PORT = 28591
|
||
PG_USER = "ai_member"
|
||
PG_DB = "vala_bi"
|
||
|
||
BOT_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
|
||
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
|
||
|
||
SALES_SHEETS = {
|
||
"小龙": "qJF4I",
|
||
"吴迪": "f975f0",
|
||
}
|
||
|
||
LOG_FILE = "/var/log/xiaoxi_sales_lesson_sync.log"
|
||
|
||
def log(msg):
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
line = f"[{ts}] {msg}"
|
||
print(line)
|
||
with open(LOG_FILE, "a") as f:
|
||
f.write(line + "\n")
|
||
|
||
def get_pg_password():
|
||
secrets_path = os.path.join(SCRIPTS_DIR, "..", "secrets.env")
|
||
with open(secrets_path) as f:
|
||
for line in f:
|
||
if line.startswith("PG_ONLINE_PASSWORD="):
|
||
return line.strip().split("=", 1)[1].strip("'\"")
|
||
|
||
def get_fs_token():
|
||
with open(os.path.join(CRED_DIR, "config.json")) as f:
|
||
cfg = json.load(f)
|
||
resp = requests.post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
|
||
timeout=15
|
||
)
|
||
return resp.json()["tenant_access_token"]
|
||
|
||
def read_sheet(token, sheet_id, range_str):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{BOT_TOKEN}/values/{sheet_id}!{range_str}"
|
||
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
|
||
data = resp.json()
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(f"读取Sheet失败: {data}")
|
||
return data["data"]["valueRange"]["values"]
|
||
|
||
def put_values(token, sheet_id, range_str, values):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{BOT_TOKEN}/values"
|
||
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
|
||
resp = requests.put(url, headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json"
|
||
}, json=body, timeout=30)
|
||
return resp.json()
|
||
|
||
def parse_date(date_str):
|
||
"""解析 'X月Y日' → (month, day)"""
|
||
date_str = str(date_str).strip()
|
||
if '月' in date_str and '日' in date_str:
|
||
parts = date_str.replace('月', ' ').replace('日', '').split()
|
||
try:
|
||
return int(parts[0]), int(parts[1])
|
||
except (ValueError, IndexError):
|
||
pass
|
||
return None, None
|
||
|
||
def batch_in(cur, sql_tpl, params, chunk=500):
|
||
results = []
|
||
for i in range(0, len(params), chunk):
|
||
batch = params[i:i+chunk]
|
||
ph = ",".join(["%s"] * len(batch))
|
||
cur.execute(sql_tpl % ph, batch)
|
||
results.extend(cur.fetchall())
|
||
return results
|
||
|
||
|
||
def main():
|
||
log("=" * 50)
|
||
log("销售表行课状态同步 启动")
|
||
|
||
try:
|
||
token = get_fs_token()
|
||
conn = psycopg2.connect(
|
||
host=PG_HOST, port=PG_PORT, user=PG_USER,
|
||
password=get_pg_password(), dbname=PG_DB, connect_timeout=30
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
# ── Step 1: 读取销售表,提取UID(优先H列,H空则E列手机号匹配) ──
|
||
all_users = [] # [{sales, name, uid, month, row_num}]
|
||
phone_rows = [] # [{sales, sheet_id, name, phone, month, row_num}] H列为空但有手机号的行
|
||
|
||
for sales_name, sheet_id in SALES_SHEETS.items():
|
||
rows = read_sheet(token, sheet_id, "A1:K2000")
|
||
for idx, row in enumerate(rows[2:], start=3): # skip header + legend
|
||
if not row or len(row) < 8:
|
||
continue
|
||
uid_str = str(row[7]).strip() if len(row) > 7 and row[7] else ''
|
||
phone_str = str(row[4]).strip() if len(row) > 4 and row[4] else ''
|
||
date_str = str(row[2]).strip() if len(row) > 2 and row[2] else ''
|
||
name = str(row[1]).strip() if len(row) > 1 and row[1] else ''
|
||
|
||
month, day = parse_date(date_str)
|
||
if month is None:
|
||
continue
|
||
|
||
# 优先用H列UID
|
||
if uid_str and uid_str not in ('', 'None', '未注册'):
|
||
try:
|
||
uid = int(float(uid_str))
|
||
if uid > 0:
|
||
all_users.append({
|
||
"sales": sales_name,
|
||
"sheet_id": sheet_id,
|
||
"name": name,
|
||
"uid": uid,
|
||
"month": month,
|
||
"row_num": idx,
|
||
})
|
||
continue
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# H列无有效UID,尝试E列手机号匹配
|
||
if phone_str and phone_str not in ('', 'None', '-'):
|
||
# 清洗手机号:去空格、去+86前缀
|
||
phone_clean = phone_str.strip().replace(' ', '').replace('\t', '')
|
||
if phone_clean.startswith('+86'):
|
||
phone_clean = phone_clean[3:]
|
||
# 验证是否为11位数字手机号
|
||
if len(phone_clean) == 11 and phone_clean.isdigit() and phone_clean.startswith('1'):
|
||
phone_rows.append({
|
||
"sales": sales_name,
|
||
"sheet_id": sheet_id,
|
||
"name": name,
|
||
"phone": phone_clean,
|
||
"month": month,
|
||
"row_num": idx,
|
||
})
|
||
|
||
# 手机号匹配 account_id
|
||
phone_to_uid = {}
|
||
if phone_rows:
|
||
phone_enc_map = {} # {encrypted: phone}
|
||
for pr in phone_rows:
|
||
enc = encrypt_phone(pr["phone"])
|
||
phone_enc_map[enc] = pr["phone"]
|
||
|
||
enc_list = list(phone_enc_map.keys())
|
||
rc = batch_in(cur,
|
||
"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN (%s) AND status=1 AND deleted_at IS NULL",
|
||
enc_list
|
||
)
|
||
for aid, tel_enc in rc:
|
||
phone = phone_enc_map.get(tel_enc)
|
||
if phone:
|
||
phone_to_uid[phone] = aid
|
||
|
||
# 将匹配到的加入 all_users
|
||
for pr in phone_rows:
|
||
uid = phone_to_uid.get(pr["phone"])
|
||
if uid:
|
||
all_users.append({
|
||
"sales": pr["sales"],
|
||
"sheet_id": pr["sheet_id"],
|
||
"name": pr["name"],
|
||
"uid": uid,
|
||
"month": pr["month"],
|
||
"row_num": pr["row_num"],
|
||
})
|
||
|
||
log(f"Step 1: 读取销售表, 有效UID: {len(all_users)} (含手机号匹配: {len(phone_to_uid)})")
|
||
|
||
if not all_users:
|
||
log("无有效UID, 退出")
|
||
cur.close()
|
||
conn.close()
|
||
return 0
|
||
|
||
# ── Step 2: 查角色映射 ──
|
||
uid_set = list(set(u["uid"] for u in all_users))
|
||
account_chars = defaultdict(list)
|
||
char_to_account = {}
|
||
|
||
rc = batch_in(cur,
|
||
"SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN (%s) AND deleted_at IS NULL",
|
||
uid_set
|
||
)
|
||
for aid, cid in rc:
|
||
account_chars[aid].append(cid)
|
||
char_to_account[cid] = aid
|
||
|
||
char_ids = list(char_to_account.keys())
|
||
log(f"Step 2: 角色映射, account={len(account_chars)}, char={len(char_ids)}")
|
||
|
||
# ── Step 2.5: 查账户信息(注册日期、下载渠道) ──
|
||
uid_info = {} # uid → {created_at, download_channel}
|
||
rc = batch_in(cur,
|
||
"SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN (%s) AND status=1 AND deleted_at IS NULL",
|
||
uid_set
|
||
)
|
||
for aid, created_at, download_channel in rc:
|
||
uid_info[aid] = {
|
||
"created_at": str(created_at)[:10] if created_at else "",
|
||
"download_channel": download_channel or ""
|
||
}
|
||
|
||
# ── Step 3: 查课时完成记录(唯一chapter_id) ──
|
||
char_chapters = defaultdict(set) # char_id → set of chapter_ids
|
||
|
||
for tbl_idx in range(8):
|
||
table = f"bi_user_chapter_play_record_{tbl_idx}"
|
||
try:
|
||
cur.execute(
|
||
f"SELECT user_id, chapter_id FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)",
|
||
(char_ids,)
|
||
)
|
||
for uid, ch_id in cur.fetchall():
|
||
char_chapters[uid].add(ch_id)
|
||
except Exception as e:
|
||
log(f" 警告 {table}: {e}")
|
||
|
||
# 汇总每个 account 的完成课时数
|
||
uid_lesson_count = {} # uid → unique chapter count
|
||
for uid in uid_set:
|
||
chars = account_chars.get(uid, [])
|
||
all_chapters = set()
|
||
for cid in chars:
|
||
all_chapters.update(char_chapters.get(cid, set()))
|
||
uid_lesson_count[uid] = len(all_chapters)
|
||
|
||
log(f"Step 3: 课时统计完成, 有记录用户: {sum(1 for v in uid_lesson_count.values() if v > 0)}")
|
||
|
||
# ── Step 4: 回填销售表 D/H/I/J 列 ──
|
||
# D=体验节数, H=UID(手机号匹配到的回填), I=注册日期, J=下载渠道
|
||
for sales_name, sheet_id in SALES_SHEETS.items():
|
||
# 读 D/H/I/J 四列
|
||
d_existing = read_sheet(token, sheet_id, "D1:D2000")
|
||
h_existing = read_sheet(token, sheet_id, "H1:H2000")
|
||
i_existing = read_sheet(token, sheet_id, "I1:I2000")
|
||
j_existing = read_sheet(token, sheet_id, "J1:J2000")
|
||
|
||
new_d, new_h, new_i, new_j = [], [], [], []
|
||
d_changed, h_changed, i_changed, j_changed = 0, 0, 0, 0
|
||
|
||
for idx in range(len(d_existing)):
|
||
row_num = idx + 1
|
||
|
||
# 保留前2行
|
||
if idx < 2:
|
||
new_d.append(d_existing[idx] if idx < len(d_existing) else [])
|
||
new_h.append(h_existing[idx] if idx < len(h_existing) else [])
|
||
new_i.append(i_existing[idx] if idx < len(i_existing) else [])
|
||
new_j.append(j_existing[idx] if idx < len(j_existing) else [])
|
||
continue
|
||
|
||
uid_for_row = None
|
||
for u in all_users:
|
||
if u["sheet_id"] == sheet_id and u["row_num"] == row_num:
|
||
uid_for_row = u["uid"]
|
||
break
|
||
|
||
if uid_for_row is not None:
|
||
# D列:体验节数(0留空,>5封顶5)
|
||
count = uid_lesson_count.get(uid_for_row, 0)
|
||
if count == 0:
|
||
d_val = '' # 0留空
|
||
elif count > 5:
|
||
d_val = 5 # 封顶5
|
||
else:
|
||
d_val = count
|
||
old_d = str(d_existing[idx][0]).strip() if idx < len(d_existing) and d_existing[idx] else ''
|
||
if old_d != str(d_val):
|
||
d_changed += 1
|
||
new_d.append([d_val])
|
||
|
||
# H列:UID(手机号匹配到的回填,已有UID不覆盖)
|
||
old_h = str(h_existing[idx][0]).strip() if idx < len(h_existing) and h_existing[idx] else ''
|
||
if not old_h or old_h in ('', 'None', '未注册'):
|
||
new_h.append([uid_for_row])
|
||
h_changed += 1
|
||
else:
|
||
new_h.append(h_existing[idx] if idx < len(h_existing) else [])
|
||
|
||
# I列:注册日期
|
||
info = uid_info.get(uid_for_row, {})
|
||
new_i_val = info.get("created_at", "")
|
||
old_i = str(i_existing[idx][0]).strip() if idx < len(i_existing) and i_existing[idx] else ''
|
||
if old_i != new_i_val:
|
||
i_changed += 1
|
||
new_i.append([new_i_val])
|
||
|
||
# J列:下载渠道
|
||
new_j_val = info.get("download_channel", "")
|
||
old_j = str(j_existing[idx][0]).strip() if idx < len(j_existing) and j_existing[idx] else ''
|
||
if old_j != new_j_val:
|
||
j_changed += 1
|
||
new_j.append([new_j_val])
|
||
else:
|
||
new_d.append(d_existing[idx] if idx < len(d_existing) else [])
|
||
new_h.append(h_existing[idx] if idx < len(h_existing) else [])
|
||
new_i.append(i_existing[idx] if idx < len(i_existing) else [])
|
||
new_j.append(j_existing[idx] if idx < len(j_existing) else [])
|
||
|
||
# 写入
|
||
for col_letter, col_name, new_vals, changed in [
|
||
("D", "体验节数", new_d, d_changed),
|
||
("H", "UID", new_h, h_changed),
|
||
("I", "注册日期", new_i, i_changed),
|
||
("J", "下载渠道", new_j, j_changed),
|
||
]:
|
||
if changed > 0:
|
||
rng = f"{col_letter}1:{col_letter}2000"
|
||
r = put_values(token, sheet_id, rng, new_vals)
|
||
if r.get("code") != 0:
|
||
log(f" 写入{col_name}列失败 {sheet_id}: {r}")
|
||
else:
|
||
log(f" {sales_name} {col_name}列更新 {changed} 行")
|
||
else:
|
||
log(f" {sales_name} {col_name}列无变化")
|
||
|
||
log(f"Step 4: D/H/I/J列回填完成")
|
||
|
||
cur.close()
|
||
conn.close()
|
||
log("完成")
|
||
return 0
|
||
|
||
except Exception as e:
|
||
log(f"ERROR: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|