ai_member_xiaoxi/scripts/fix_bot_sales_sheets.py
2026-06-06 08:00:01 +08:00

474 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
销售三表 KQ + S/U 订单&行课写回Step2 触发)
触发Cursor Step1 完成后 @小溪,或群发【执行更新】
归属:小溪 (xiaoxi)
写回列:
K: 是否下单(是/空)
L: 下单日期
M: 成交渠道
N: 产品名称
O: GMV
P: 退款(元,全额退则清空 K/O/P/Q
Q: GSV
S: 行课进度(如 L1-S0-U00-L01
U: 累计学习时长(分钟)
规则:
- L≥C 才 K=是
- 同 UID 多行只写匹配手机号那行
- 全额退(P≥O) → 清 K/O/P/Q不进订单表
- 只填空/更新,不覆盖已有有效数据(除非需要修正)
分工约定见 docs/bot-step2-schedule-and-orders.md
"""
import json, time, re, sys, requests, psycopg2
from datetime import datetime
from collections import defaultdict
# ── 配置 ──
APP_ID = "cli_a929ae22e0b8dcc8"
APP_SECRET = "OtFjMy7p3qE3VvLbMdcWidwgHOnGD4FJ"
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SALES_SHEETS = {"f975f0": "吴迪", "qJF4I": "小龙", "qJF4J": "成都"}
def _get_pg_password():
import os
secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env")
with open(secrets_path) as f:
for line in f:
if line.startswith("PG_ONLINE_PASSWORD="):
return line.strip().split("=", 1)[1].strip('"').strip("'")
raise RuntimeError("PG_ONLINE_PASSWORD not found in secrets.env")
PG_CONFIG = {
"host": "bj-postgres-16pob4sg.sql.tencentcdb.com", "port": 28591,
"user": "ai_member", "password": _get_pg_password(), "database": "vala_bi",
}
GOODS_MAP = {
57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季",
31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包",
61: "瓦拉英语level1+2",
}
def get_token():
r = requests.post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=15)
return r.json()["tenant_access_token"]
def read_sheet(token, sheet_id, range_str):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}!{range_str}?valueRenderOption=ToString"
r = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = r.json()
if data.get("code") != 0:
print(f"Error reading {sheet_id}: {data}")
return []
return data["data"]["valueRange"]["values"]
def put_values(token, sheet_id, range_str, values, retries=3):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
for attempt in range(retries):
r = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
result = r.json()
if result.get("code") == 0:
return True
print(f" Retry {attempt+1} for {range_str}: {result.get('msg','')}")
time.sleep(1)
print(f" FAILED {range_str}")
return False
def write_batch_col(token, sheet_id, col_letter, row_vals):
"""Write multiple cells in same column. row_vals = [(row, val), ...]"""
if not row_vals:
return
row_vals.sort()
i = 0
while i < len(row_vals):
j = i
while j + 1 < len(row_vals) and row_vals[j + 1][0] == row_vals[j][0] + 1:
j += 1
start, end = row_vals[i][0], row_vals[j][0]
vals = [[v] for _, v in row_vals[i:j + 1]]
put_values(token, sheet_id, f"{col_letter}{start}:{col_letter}{end}", vals)
i = j + 1
time.sleep(0.1)
def parse_date(s):
s = str(s).strip()
if not s:
return None
m = re.match(r'(\d{4})-(\d{1,2})-(\d{1,2})', s)
if m:
return (int(m.group(1)), int(m.group(2)), int(m.group(3)))
m = re.match(r'(\d{1,2})月(\d{1,2})日', s)
if m:
return (2026, int(m.group(1)), int(m.group(2)))
m = re.match(r'(\d{4})/(\d{1,2})/(\d{1,2})', s)
if m:
return (int(m.group(1)), int(m.group(2)), int(m.group(3)))
return None
def date_le(a, b):
if a is None or b is None:
return False
return a <= b
def phone_match(sheet_phone, db_tel):
if not sheet_phone or not db_tel:
return False
sheet_phone = str(sheet_phone).strip()
db_tel = str(db_tel).strip()
if sheet_phone == db_tel:
return True
if "****" in db_tel:
parts = db_tel.split("****")
if len(parts) == 2:
return sheet_phone.startswith(parts[0]) and sheet_phone.endswith(parts[1])
return False
def main():
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] Step2 KQ + S/U 写回 启动")
token = get_token()
# ── Step 1: 读取销售三表 ──
all_rows = []
for sid, name in SALES_SHEETS.items():
print(f"Reading {name}...")
vals = read_sheet(token, sid, "A3:V10000")
for i, row in enumerate(vals):
while len(row) < 22:
row.append("")
b = str(row[1]).strip() if row[1] else ""
e = str(row[4]).strip() if row[4] else ""
h = str(row[7]).strip() if row[7] else ""
if not b and not e and not h:
continue
all_rows.append({
"sid": sid, "name": name, "row": i + 3,
"B": b,
"C": str(row[2]).strip() if row[2] else "",
"E": e,
"H": h,
"K": str(row[10]).strip() if len(row) > 10 and row[10] else "",
"L": str(row[11]).strip() if len(row) > 11 and row[11] else "",
"M": str(row[12]).strip() if len(row) > 12 and row[12] else "",
"N": str(row[13]).strip() if len(row) > 13 and row[13] else "",
"O": str(row[14]).strip() if len(row) > 14 and row[14] else "",
"P": str(row[15]).strip() if len(row) > 15 and row[15] else "",
"Q": str(row[16]).strip() if len(row) > 16 and row[16] else "",
"S": str(row[18]).strip() if len(row) > 18 and row[18] else "",
"U": str(row[20]).strip() if len(row) > 20 and row[20] else "",
})
print(f"Total rows: {len(all_rows)}")
# ── Step 2: 收集 UID ──
all_uids = set()
for r in all_rows:
if r["H"].isdigit():
all_uids.add(r["H"])
print(f"Unique UIDs: {len(all_uids)}")
# ── Step 3: DB 查询 ──
uid_tel = {}
uid_orders = {}
uid_refunds = {}
uid_learning = {} # uid -> {progress, minutes}
conn = psycopg2.connect(**PG_CONFIG)
cur = conn.cursor()
ul = list(all_uids)
# 3a. 手机号
for s in range(0, len(ul), 500):
ch = ul[s:s + 500]
ph = ",".join(["%s"] * len(ch))
cur.execute(f"SELECT id, tel FROM bi_vala_app_account WHERE id IN ({ph})", ch)
for uid, tel in cur.fetchall():
uid_tel[str(uid)] = tel or ""
time.sleep(0.05)
# 3b. 订单非测试order_status 3/4
for s in range(0, len(ul), 100):
ch = ul[s:s + 100]
ph = ",".join(["%s"] * len(ch))
cur.execute(f"""
SELECT o.account_id, o.trade_no, o.goods_id, o.pay_amount_int,
o.pay_success_date, o.order_status, o.key_from
FROM bi_vala_order o
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.account_id IN ({ph})
AND o.pay_success_date IS NOT NULL
AND o.order_status IN (3,4)
ORDER BY o.account_id, o.pay_success_date DESC
""", ch)
for uid, tn, gid, amt, pd, os, kf in cur.fetchall():
uid_orders.setdefault(str(uid), []).append({
"tn": tn, "gid": gid, "amt": amt,
"pd": str(pd)[:10] if pd else "",
"os": os, "kf": kf or "",
})
time.sleep(0.05)
# 3c. 退款order_status=4 + refund status=3
for s in range(0, len(ul), 100):
ch = ul[s:s + 100]
ph = ",".join(["%s"] * len(ch))
cur.execute(f"""
SELECT o.account_id, COALESCE(SUM(r.refund_amount::numeric), 0)
FROM bi_vala_order o
JOIN bi_refund_order r ON o.trade_no = r.trade_no AND r.status = 3
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
WHERE o.account_id IN ({ph})
AND o.order_status = 4
AND o.pay_success_date IS NOT NULL
GROUP BY o.account_id
""", ch)
for uid, ref in cur.fetchall():
uid_refunds[str(uid)] = float(ref)
time.sleep(0.05)
# 3d. 行课进度S/U
# 角色映射
ul_int = [int(x) for x in ul]
cur.execute("SELECT account_id, id FROM bi_vala_app_character WHERE account_id = ANY(%s) AND deleted_at IS NULL",
(ul_int,))
account_chars = defaultdict(list)
char_to_account = {}
for aid, cid in cur.fetchall():
account_chars[str(aid)].append(cid)
char_to_account[cid] = str(aid)
char_ids = list(char_to_account.keys())
print(f" Characters: {len(char_ids)}")
# 课程映射
cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson")
chapter_map = {}
for ch_id, cl, cs, cu, cl2 in cur.fetchall():
chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "")
# 课时完成记录
char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "total_ms": 0})
for tbl_idx in range(8):
table = f"bi_user_chapter_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, chapter_id, created_at FROM {table} WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)",
(char_ids,)
)
for uid, ch_id, created_at in cur.fetchall():
ch_data = chapter_map.get(ch_id)
if not ch_data:
continue
rec = char_plays[uid]
if rec["latest_time"] is None or created_at > rec["latest_time"]:
rec["latest_time"] = created_at
rec["latest_chapter"] = ch_data
except Exception as e:
print(f" Warning {table}: {e}")
# 学习总耗时
for tbl_idx in range(8):
table = f"bi_user_component_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id",
(char_ids,)
)
for uid, total_ms in cur.fetchall():
if uid in char_plays:
char_plays[uid]["total_ms"] += (total_ms or 0)
except Exception as e:
print(f" Warning {table}: {e}")
# 汇总到 account 级别
for uid_str in all_uids:
chars = account_chars.get(uid_str, [])
best_ch = None
best_time = None
total_ms = 0
for cid in chars:
play = char_plays.get(cid)
if not play:
continue
if play["latest_chapter"]:
if best_time is None or play["latest_time"] > best_time:
best_time = play["latest_time"]
best_ch = play["latest_chapter"]
total_ms += play["total_ms"]
if best_ch:
cl, cs, cu, cl2 = best_ch
uid_learning[uid_str] = {
"progress": f"{cl}-{cs}-{cu}-{cl2}",
"minutes": round(total_ms / 60000, 1),
}
if uid_learning[uid_str]["minutes"] == int(uid_learning[uid_str]["minutes"]):
uid_learning[uid_str]["minutes"] = int(uid_learning[uid_str]["minutes"])
cur.close()
conn.close()
print(f" Orders: {len(uid_orders)}, Refunds: {len(uid_refunds)}, Learning: {len(uid_learning)}")
# ── Step 4: DUP 检测 ──
uid_row_groups = defaultdict(list)
for r in all_rows:
if r["H"].isdigit():
uid_row_groups[r["H"]].append(r)
uid_best_row = {}
for uid, rows in uid_row_groups.items():
if len(rows) == 1:
uid_best_row[uid] = rows[0]
else:
db_tel = uid_tel.get(uid, "")
best = None
for r in rows:
if phone_match(r["E"], db_tel):
best = r
break
if best is None:
for r in rows:
if r["E"]:
best = r
break
if best is None:
best = rows[0]
uid_best_row[uid] = best
# ── Step 5: 计算每行应写值 ──
writes = defaultdict(lambda: defaultdict(list)) # sid -> col -> [(row, val)]
for r in all_rows:
uid = r["H"] if r["H"].isdigit() else ""
row = r["row"]
sid = r["sid"]
# 判断是否是该 UID 的最佳行
is_best = (uid and uid in uid_best_row and uid_best_row[uid] is r)
if not uid or uid not in uid_orders or not is_best:
# 无订单或非最佳行:确保 K/O/P/Q 为空
if r["K"]:
writes[sid]["K"].append((row, ""))
if r["O"]:
writes[sid]["O"].append((row, ""))
if r["P"]:
writes[sid]["P"].append((row, ""))
if r["Q"]:
writes[sid]["Q"].append((row, ""))
if r["L"]:
writes[sid]["L"].append((row, ""))
if r["M"]:
writes[sid]["M"].append((row, ""))
if r["N"]:
writes[sid]["N"].append((row, ""))
# 行课数据仍然写入(独立于订单)
if uid in uid_learning:
learning = uid_learning[uid]
if learning["progress"] and str(r["S"]) != learning["progress"]:
writes[sid]["S"].append((row, learning["progress"]))
if learning["minutes"] and str(r["U"]) != str(learning["minutes"]):
writes[sid]["U"].append((row, learning["minutes"]))
continue
# 有订单 + 最佳行
orders = uid_orders[uid]
total_gmv_cents = sum(o["amt"] for o in orders)
total_refund_cents = int(uid_refunds.get(uid, 0) * 100)
gsv_cents = total_gmv_cents - total_refund_cents
total_gmv = total_gmv_cents // 100
total_refund = total_refund_cents // 100
gsv = gsv_cents // 100
latest = max(orders, key=lambda o: o["pd"])
# L≥C 检查
c_date = parse_date(r["C"])
l_date = parse_date(latest["pd"])
if not date_le(c_date, l_date):
# L<C: 不清 K可能之前已写但也不写新数据
continue
# 全额退检查
is_full_refund = (total_refund_cents > 0 and total_refund_cents >= total_gmv_cents)
if is_full_refund:
# 全额退 → 清 K/O/P/Q
if r["K"]:
writes[sid]["K"].append((row, ""))
if r["O"]:
writes[sid]["O"].append((row, ""))
if r["P"]:
writes[sid]["P"].append((row, ""))
if r["Q"]:
writes[sid]["Q"].append((row, ""))
if r["L"]:
writes[sid]["L"].append((row, ""))
if r["M"]:
writes[sid]["M"].append((row, ""))
if r["N"]:
writes[sid]["N"].append((row, ""))
else:
# 正常成单
if r["K"] != "":
writes[sid]["K"].append((row, ""))
if str(r["L"]) != latest["pd"]:
writes[sid]["L"].append((row, latest["pd"]))
if str(r["M"]) != latest["kf"]:
writes[sid]["M"].append((row, latest["kf"]))
goods_name = GOODS_MAP.get(latest["gid"], f"商品{latest['gid']}")
if str(r["N"]) != goods_name:
writes[sid]["N"].append((row, goods_name))
o_write = total_gmv if total_gmv > 0 else ""
if str(r["O"]) != str(o_write):
writes[sid]["O"].append((row, o_write))
p_write = total_refund if total_refund > 0 else ""
if str(r["P"]) != str(p_write):
writes[sid]["P"].append((row, p_write))
q_write = gsv if gsv > 0 else ""
if str(r["Q"]) != str(q_write):
writes[sid]["Q"].append((row, q_write))
# 行课数据(独立于订单,所有有 UID 的用户都写)
if uid in uid_learning:
learning = uid_learning[uid]
if learning["progress"] and str(r["S"]) != learning["progress"]:
writes[sid]["S"].append((row, learning["progress"]))
if learning["minutes"] and str(r["U"]) != str(learning["minutes"]):
writes[sid]["U"].append((row, learning["minutes"]))
# ── Step 6: 写入 ──
cols_order = ["K", "L", "M", "N", "O", "P", "Q", "S", "U"]
for sid, name in SALES_SHEETS.items():
sid_writes = writes.get(sid, {})
total_writes = sum(len(v) for v in sid_writes.values())
if total_writes == 0:
print(f"{name}: no changes")
continue
print(f"{name}: {total_writes} cell updates")
for col in cols_order:
col_writes = sid_writes.get(col, [])
if col_writes:
write_batch_col(token, sid, col, col_writes)
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ✅ Step2 KQ + S/U 写回完成")
if __name__ == "__main__":
main()