ai_member_xiaoxi/scripts/sales_lead_auto_fill.py
2026-06-05 08:00:01 +08:00

271 lines
9.1 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
销售线索自动回填 v5 — 只写 D/H/I/J 四列
执行频率:每小时 cron 巡检
归属 Agent小溪 (xiaoxi)
v5 精简版2026-06-04 陈逸鸫确认):只写 D/H/I/JK~V 由 Cursor 公式接管
分工约定见 docs/bot-xiaoxi-contract-v5.md
表格列结构:
D: 体验节数 (小溪 cron)
E: 手机号 (销售填 → 小溪读)
H: 用户ID (小溪 cron)
I: 注册日期 (小溪 cron)
J: 下载渠道 (小溪 cron)
"""
import json, requests, os, re, sys, time, psycopg2
from datetime import datetime
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
PG_HOST = "bj-postgres-16pob4sg.sql.tencentcdb.com"
PG_PORT = 28591
PG_USER = "ai_member"
PG_DB = "vala_bi"
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SHEET_IDS = {"吴迪": "f975f0", "小龙": "qJF4I", "成都": "qJF4J"}
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
LOG_FILE = "/var/log/xiaoxi_sales_lead.log"
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_pg_password():
secrets_path = os.path.join(SCRIPTS_DIR, "..", "secrets.env")
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip("'\"")
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_sheet(token, sheet_id):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"读取Sheet失败: {data}")
return data["data"]["valueRange"]["values"]
def put_values(token, sheet_id, range_str, values, retries=3):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
for attempt in range(retries):
resp = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
result = resp.json()
code = result.get("code", -1)
if code == 0:
return result
if code == 90217:
wait = 2 ** attempt
log(f" 限流 {sheet_id}!{range_str}, 等待{wait}s重试...")
time.sleep(wait)
continue
log(f" 写入失败 {sheet_id}!{range_str}: {result}")
return result
return result
def encrypt_phone_local(phone):
phone = str(phone).strip()
if "." in phone:
parts = phone.split(".")
if parts[1] in ("0", "00"):
phone = parts[0]
if re.match(r"^1\d{10}$", phone):
return encrypt_phone(phone)
return None
def process_sheet(token, cur, sheet_name, sheet_id):
log(f"\n--- [{sheet_name}] {sheet_id} ---")
rows = read_sheet(token, sheet_id)
if len(rows) <= 2:
log(f" [{sheet_name}] 无数据行,跳过")
return {"processed": 0, "matched": 0}
log(f" [{sheet_name}] 读取到 {len(rows) - 2} 行数据")
# v5: 只检查 D(3), H(7), I(8), J(9) 是否为空
auto_cols = [3, 7, 8, 9]
pending = []
for idx, row in enumerate(rows[2:], start=3):
if len(row) < 5:
continue
phone = str(row[4]).strip() if len(row) > 4 and row[4] else ""
if not phone:
continue
for ci in auto_cols:
val = str(row[ci]).strip() if len(row) > ci and row[ci] else ""
if not val:
pending.append({"row_idx": idx, "phone": phone})
break
log(f" [{sheet_name}] 待处理: {len(pending)}")
if not pending:
return {"processed": 0, "matched": 0}
# 手机号 → account_id
phones_raw = list(set(r["phone"] for r in pending))
valid_phones = [(p, encrypt_phone_local(p)) for p in phones_raw if encrypt_phone_local(p)]
enc_list = list(set(m[1] for m in valid_phones))
enc_to_aid = {}
for i in range(0, len(enc_list), 500):
batch = enc_list[i:i + 500]
ph = ",".join(["%s"] * len(batch))
cur.execute(
f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
batch
)
for aid, tel_enc in cur.fetchall():
if tel_enc not in enc_to_aid:
enc_to_aid[tel_enc] = aid
phone_to_aid = {}
for phone, enc in valid_phones:
if enc in enc_to_aid:
phone_to_aid[phone] = enc_to_aid[enc]
log(f" [{sheet_name}] 手机号匹配: {len(phone_to_aid)}/{len(valid_phones)}")
matched_aids = list(set(phone_to_aid.values()))
if not matched_aids:
for r in pending:
put_values(token, sheet_id, f"H{r['row_idx']}:H{r['row_idx']}", [["未注册"]])
log(f" [{sheet_name}] 全部未匹配,已标记 {len(pending)}")
return {"processed": len(pending), "matched": 0}
# 账号信息: I(注册日期), J(下载渠道)
cur.execute(
f"SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN ({','.join(['%s']*len(matched_aids))}) AND status=1 AND deleted_at IS NULL",
matched_aids
)
account_info = {}
for aid, created_at, channel in cur.fetchall():
account_info[aid] = {
"created_at": created_at.strftime("%Y-%m-%d") if created_at else "",
"download_channel": channel or ""
}
# 体验课完成节数: D列
cur.execute(
f"SELECT a.id, COUNT(*) FROM bi_vala_app_account a INNER JOIN bi_vala_app_character c ON a.id=c.account_id AND c.deleted_at IS NULL INNER JOIN bi_user_course_detail ucd ON c.id=ucd.user_id AND ucd.deleted_at IS NULL WHERE a.id IN ({','.join(['%s']*len(matched_aids))}) AND a.status=1 AND a.deleted_at IS NULL AND ucd.expire_time IS NULL GROUP BY a.id",
matched_aids
)
trial_count = {r[0]: r[1] for r in cur.fetchall()}
# 组装回填数据v5: 只 D/H/I/J
results = []
for r in pending:
phone = r["phone"]
aid = phone_to_aid.get(phone)
row_num = r["row_idx"]
if not aid:
results.append({"row": row_num, "H": "未注册", "I": "", "J": "", "D": ""})
continue
info = account_info.get(aid, {})
trials = trial_count.get(aid, 0)
results.append({
"row": row_num,
"D": str(trials) if trials else "0",
"H": str(aid),
"I": info.get("created_at", ""),
"J": info.get("download_channel", ""),
})
results.sort(key=lambda x: x["row"])
log(f" [{sheet_name}] 准备回填 {len(results)}")
# 按连续行分组写入
groups = []
cur_grp = []
for r in results:
if not cur_grp or r["row"] == cur_grp[-1]["row"] + 1:
cur_grp.append(r)
else:
groups.append(cur_grp)
cur_grp = [r]
if cur_grp:
groups.append(cur_grp)
for gi, g in enumerate(groups):
sr, er = g[0]["row"], g[-1]["row"]
# D列
d_vals = [[r.get("D", "")] for r in g]
put_values(token, sheet_id, f"D{sr}:D{er}", d_vals)
# H~J 整块写
hij_vals = [[r.get("H", ""), r.get("I", ""), r.get("J", "")] for r in g]
put_values(token, sheet_id, f"H{sr}:J{er}", hij_vals)
if gi % 5 == 4:
time.sleep(0.5)
matched_count = sum(1 for r in results if r["H"] != "未注册")
log(f" [{sheet_name}] 回填完成: {len(results)} 行, 匹配 {matched_count}, 未注册 {len(results) - matched_count}")
return {"processed": len(results), "matched": matched_count}
def main():
log("=" * 50)
log("销售线索自动回填 v5 启动(只写 D/H/I/J")
try:
token = get_fs_token()
conn = psycopg2.connect(
host=PG_HOST, port=PG_PORT, user=PG_USER,
password=get_pg_password(), dbname=PG_DB, connect_timeout=30
)
cur = conn.cursor()
total_processed = 0
total_matched = 0
for sheet_name, sheet_id in SHEET_IDS.items():
r = process_sheet(token, cur, sheet_name, sheet_id)
total_processed += r["processed"]
total_matched += r["matched"]
cur.close()
conn.close()
log(f"\n全部完成: 处理 {total_processed} 行, 匹配 {total_matched}, 未注册 {total_processed - total_matched}")
return 0
except Exception as e:
log(f"ERROR: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())