ai_member_xiaoxi/scripts/sales_leads_full_refresh.py
2026-06-17 08:00:01 +08:00

821 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
销售线索全量刷新脚本 — XXTEA 精确匹配版
功能:
1. 读取「小龙」「吴迪」「成都」三个 sheet 的 E 列手机号
2. XXTEA 加密 → bi_vala_app_account.tel_encrypt 精确匹配 → 获取 account_id
3. 查询 PostgreSQL 获取用户订单/学习数据
4. 填写 D/H/I/J/K~U/X/Y/Z 列自动列U 列为操作更新时间
5. 将三个 sheet 中 Y=1有效订单的用户汇总到「订单汇总」sheet
规则(沿用 S2 规则):
① E→H: XXTEA 精确匹配, 查不到留空
② H→D/I/J: 只补空, 不覆盖已有值
③ Y=1: 仅当 K(下单日) >= C(线索日期)
④ 全额退清: 所有订单都退费 → N/O/P 全部清空
⑤ N/O/P 0 留空, O 整元
⑥ G 列不动
用法:
python3 scripts/sales_leads_full_refresh.py
"""
import json, re, time, sys, os, requests, psycopg2
from datetime import datetime
from collections import defaultdict
from feishu_sheet_utils import FeishuSheetWriter
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
WORKSPACE = os.path.dirname(SCRIPTS_DIR)
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SALES_SHEETS = [
("qJF4I", "小龙", "A3:Z2607"),
("f975f0", "吴迪", "A3:Z8149"),
("qJF4J", "成都", "A3:Z2500"),
]
SUMMARY_SHEET_ID = "2smjwA"
CS_MAP = {"吴迪": "吴迪", "小龙": "小龙", "Tom": "Tom", "Bob": "Bob"}
GOODS_NAMES = {
57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季",
31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包",
61: "瓦拉英语level1+2",
}
CHANNEL_MAP = {
"Apple App Store": "苹果", "科大讯飞学习机": "讯飞", "学而思学习机": "学而思",
"华为应用市场": "华为", "小米应用市场": "小米", "应用宝应用市场": "应用宝",
"希沃学习机": "希沃", "荣耀应用市场": "荣耀", "小度学习机": "小度",
"oppo应用市场": "OPPO", "vivo应用市场": "VIVO", "京东方学习机": "京东方",
"步步高学习机": "步步高", "作业帮学习机": "作业帮", "魅族应用市场": "魅族",
"官网": "官网",
}
# Z列渠道归属分类规则 [王虹茗确认 2026-06-15]
def classify_channel(key_from):
"""将 key_from 归类为: 端内 / 销转 / 达人 / 直购"""
if not key_from:
return "直购"
kf = key_from.strip()
if kf in ("app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0"):
return "端内"
if kf.startswith("sales-adp-"):
return "销转"
if kf.startswith("newmedia-daren-") or kf == "newmedia-dianpu-wwxx-0-0":
return "达人"
# 其余: dianpu(不含wwxx) + partner/stream/miniprogram/jingxuan/空/shuadan等 → 直购
return "直购"
LOG_FILE = "/var/log/xiaoxi_full_refresh.log"
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_secret(key):
with open(os.path.join(WORKSPACE, "secrets.env")) as f:
for line in f:
if line.startswith(f"{key}="):
return line.strip().split("=", 1)[1].strip("'\"")
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_sheet(token, sheet_id, range_str=None):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
if range_str:
url += f"!{range_str}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"读取失败 {sheet_id}: {data}")
return data["data"]["valueRange"]["values"]
def put_values(token, sheet_id, range_str, values):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
resp = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
r = resp.json()
if r.get("code") != 0:
log(f"{range_str}: code={r.get('code')} msg={r.get('msg')}")
return False
return True
def batch_in(cur, sql_tpl, params, chunk=500):
results = []
for i in range(0, len(params), chunk):
batch = params[i:i + chunk]
ph = ",".join(["%s"] * len(batch))
cur.execute(sql_tpl % ph, batch)
results.extend(cur.fetchall())
return results
def safe_cell(row, idx):
"""安全获取单元格值,数字转整数字符串"""
if len(row) > idx and row[idx] is not None:
try:
if isinstance(row[idx], (int, float)):
if row[idx] == int(row[idx]):
return str(int(row[idx]))
return str(row[idx]).strip()
except (ValueError, TypeError):
return str(row[idx]).strip()
return ""
def parse_date_str(s):
"""'6月7日'/'6月7日 10:23:48''2026-06-07'/'2026-06-07 10:23:48', YYYY-MM-DD 原样返回"""
if not s:
return ""
s = s.strip()
if re.match(r'^\d{4}-\d{2}-\d{2}', s):
return s
# 提取日期+可选时间: '6月7日 10:23:48' 或 '6月7日'
m = re.match(r'^(\d{1,2})月(\d{1,2})日(?:\s+(\d{1,2}:\d{2}:\d{2}))?', s)
if m:
year = datetime.now().year
date_part = f"{year}-{int(m.group(1)):02d}-{int(m.group(2)):02d}"
if m.group(3):
return f"{date_part} {m.group(3)}"
return date_part
return s
# ═══ Step 1: 解析三个销售 sheet ═══
def parse_sales_sheets(token):
all_data = {}
for sid, sname, rng in SALES_SHEETS:
rows = read_sheet(token, sid, rng)
entries = []
for idx, row in enumerate(rows):
row_num = idx + 3
if not row or all(not cell for cell in row):
continue
a_val = safe_cell(row, 0)
sales = None
for k, v in CS_MAP.items():
if k in a_val:
sales = v
break
if not sales:
continue
phone = ""
if len(row) > 4 and row[4]:
try:
phone = str(int(float(str(row[4]))))
except (ValueError, TypeError):
phone = str(row[4]).strip()
entries.append({
"row": row_num,
"sales": sales,
"nickname": safe_cell(row, 1),
"clue_date": safe_cell(row, 2),
"clue_date_parsed": parse_date_str(safe_cell(row, 2)),
"phone": phone,
"grade": safe_cell(row, 5),
"history": safe_cell(row, 6),
"existing": {
"D": safe_cell(row, 3),
"H": safe_cell(row, 7),
"I": safe_cell(row, 8),
"J": safe_cell(row, 9),
"K": safe_cell(row, 10),
"L": safe_cell(row, 11),
"M": safe_cell(row, 12),
"N": safe_cell(row, 13),
"O": safe_cell(row, 14),
"P": safe_cell(row, 15),
"Q": safe_cell(row, 16),
"R": safe_cell(row, 17),
"S": safe_cell(row, 18),
"T": safe_cell(row, 19),
"U": safe_cell(row, 20),
"X": safe_cell(row, 23),
"Y": safe_cell(row, 24),
"Z": safe_cell(row, 25),
},
})
all_data[sid] = entries
phone_cnt = sum(1 for e in entries if re.match(r'^\d{11}$', e["phone"]))
uid_cnt = sum(1 for e in entries if e["existing"]["H"] and e["existing"]["H"].isdigit())
log(f" [{sname}] {len(entries)}行, 手机号{phone_cnt}, 已有UID{uid_cnt}")
return all_data
# ═══ Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配 ═══
def phone_to_uid_xxtea(all_entries):
phone_set = set()
for entries in all_entries.values():
for e in entries:
if re.match(r'^\d{11}$', e["phone"]):
phone_set.add(e["phone"])
if not phone_set:
log(" 无有效手机号")
return {}
log(f" XXTEA 加密匹配: {len(phone_set)} 个唯一手机号")
phone_enc_map = {}
for phone in phone_set:
try:
phone_enc_map[encrypt_phone(phone)] = phone
except Exception as ex:
log(f" 加密失败 {phone}: {ex}")
log(f" 加密完成, 唯一密文: {len(phone_enc_map)}")
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
dbname="vala_bi", connect_timeout=30
)
cur = conn.cursor()
enc_list = list(phone_enc_map.keys())
phone_to_uid = {}
for i in range(0, len(enc_list), 500):
chunk = enc_list[i:i + 500]
ph = ",".join(["%s"] * len(chunk))
cur.execute(
f"SELECT id, tel_encrypt FROM bi_vala_app_account "
f"WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
chunk
)
for uid, tel_enc in cur.fetchall():
plain = phone_enc_map.get(tel_enc)
if plain:
phone_to_uid[plain] = str(uid)
time.sleep(0.05)
cur.close()
conn.close()
log(f" 精确匹配到 {len(phone_to_uid)} 个 UID (via XXTEA)")
return phone_to_uid
# ═══ Step 3: PostgreSQL 批量查询 ═══
def query_all_pg(all_entries, phone_map):
uid_set = set()
for entries in all_entries.values():
for e in entries:
if re.match(r'^\d{11}$', e["phone"]) and e["phone"] in phone_map:
uid_set.add(int(phone_map[e["phone"]]))
h_val = e["existing"]["H"]
if h_val and h_val.isdigit() and int(h_val) > 0:
uid_set.add(int(h_val))
uid_list = list(uid_set)
log(f" 有效 user_id: {len(uid_list)}")
if not uid_list:
return {}
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
dbname="vala_bi", connect_timeout=30
)
cur = conn.cursor()
info = {uid: {
"reg_date": "", "download_channel": "", "trial_count": 0,
"has_order": False, "order_date": "", "order_channel": "", "product": "",
"gmv": 0, "refund": 0, "gsv": 0,
"activation": "", "lesson_progress": "", "lesson_time": "", "lesson_minutes": 0,
"trade_no": "",
} for uid in uid_set}
# 3a. 注册信息
log(" 查询注册信息...")
reg_info = batch_in(cur,
"SELECT id, created_at, download_channel FROM bi_vala_app_account "
"WHERE id IN (%s) AND status=1 AND deleted_at IS NULL",
uid_list
)
for aid, created_at, dc in reg_info:
if aid in info:
info[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else ""
raw_ch = dc or ""
info[aid]["download_channel"] = CHANNEL_MAP.get(raw_ch, raw_ch)
# 3b. 体验节数
log(" 查询体验节数...")
trial_info = batch_in(cur,
"SELECT account_id, COUNT(*) FROM bi_user_course_detail "
"WHERE account_id IN (%s) AND expire_time IS NULL AND deleted_at IS NULL "
"GROUP BY account_id",
uid_list
)
for aid, cnt in trial_info:
if aid in info:
info[aid]["trial_count"] = cnt
# 3c. 订单信息
log(" 查询订单信息...")
orders = batch_in(cur,
"SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status "
"FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL "
"AND order_status IN (3,4) ORDER BY pay_success_date DESC",
uid_list
)
user_orders = defaultdict(list)
for o in orders:
user_orders[o[0]].append(o)
trade_nos = [o[1] for o in orders if o[1]]
refund_map = {}
if trade_nos:
refunds = batch_in(cur,
"SELECT trade_no, refund_amount_int FROM bi_refund_order "
"WHERE trade_no IN (%s) AND status=3",
trade_nos
)
for tn, amt in refunds:
refund_map[tn] = amt
for aid, olist in user_orders.items():
if aid not in info:
continue
info[aid]["has_order"] = True
latest = olist[0]
# K列格式: M月D日 HH:MM:SS, 同时保留 raw 用于日期比较
if latest[2]:
dt = latest[2]
info[aid]["order_date"] = f"{dt.month}{dt.day}{dt.strftime('%H:%M:%S')}"
info[aid]["order_date_raw"] = dt.strftime("%Y-%m-%d %H:%M:%S")
else:
info[aid]["order_date"] = ""
info[aid]["order_date_raw"] = ""
info[aid]["order_channel"] = latest[3] or ""
info[aid]["product"] = GOODS_NAMES.get(latest[4], f"商品{latest[4]}")
info[aid]["trade_no"] = latest[1] or ""
total_gmv = sum(o[5] for o in olist) / 100.0
total_refund = sum(refund_map.get(o[1], 0) for o in olist) / 100.0
info[aid]["gmv"] = total_gmv
info[aid]["refund"] = total_refund
info[aid]["gsv"] = total_gmv - total_refund
# 3d. 激活课程
log(" 查询激活课程...")
try:
activations = batch_in(cur,
"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket "
"WHERE account_id IN (%s) AND status=1 AND deleted_at IS NULL "
"AND season_package_level IN ('A1','A2')",
uid_list
)
for aid, lvl in activations:
if aid in info:
info[aid]["activation"] = lvl
except Exception as ex:
log(f" 激活查询异常: {ex}")
# 3e. 角色信息
log(" 查询角色信息...")
char_info = batch_in(cur,
"SELECT account_id, id FROM bi_vala_app_character "
"WHERE account_id IN (%s) AND deleted_at IS NULL",
uid_list
)
account_chars = defaultdict(list)
char_to_account = {}
for aid, cid in char_info:
account_chars[aid].append(cid)
char_to_account[cid] = aid
char_ids = list(char_to_account.keys())
log(f" 角色数: {len(char_ids)}")
# 3f. 课程映射
cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson")
chapter_map = {}
for ch_id, cl, cs, cu, cl2 in cur.fetchall():
chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "")
# 3g. 课时完成记录
log(" 查询课时完成记录...")
char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "total_ms": 0})
for tbl_idx in range(8):
table = f"bi_user_chapter_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, chapter_id, created_at FROM {table} "
f"WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)",
(char_ids,)
)
for uid, ch_id, created_at in cur.fetchall():
ch_data = chapter_map.get(ch_id)
if not ch_data:
continue
rec = char_plays[uid]
if rec["latest_time"] is None or created_at > rec["latest_time"]:
rec["latest_time"] = created_at
rec["latest_chapter"] = (ch_id, ch_data)
except Exception as ex:
log(f" 警告 {table}: {ex}")
# 3h. 学习总耗时
log(" 查询学习耗时...")
for tbl_idx in range(8):
table = f"bi_user_component_play_record_{tbl_idx}"
try:
cur.execute(
f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} "
f"WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id",
(char_ids,)
)
for uid, total_ms in cur.fetchall():
if uid in char_plays:
char_plays[uid]["total_ms"] += (total_ms or 0)
except Exception as ex:
log(f" 警告 {table}: {ex}")
cur.close()
conn.close()
# 汇总到 account 级别
for aid in uid_set:
chars = account_chars.get(aid, [])
best_time = None
best_ch = None
total_ms = 0
for cid in chars:
play = char_plays.get(cid)
if not play:
continue
if play["latest_chapter"]:
if best_time is None or play["latest_time"] > best_time:
best_time = play["latest_time"]
best_ch = play["latest_chapter"]
total_ms += play["total_ms"]
info[aid]["lesson_minutes"] = round(total_ms / 60000, 1)
if info[aid]["lesson_minutes"] == int(info[aid]["lesson_minutes"]):
info[aid]["lesson_minutes"] = int(info[aid]["lesson_minutes"])
if best_ch:
ch_id, (cl, cs, cu, cl2) = best_ch
info[aid]["lesson_progress"] = f"{cl}-{cs}-{cu}-{cl2}"
info[aid]["lesson_time"] = best_time.strftime("%Y-%m-%d") if best_time else ""
log(f" 数据库查询完成")
return info
# ═══ Step 4: 写入销售三表 D/H/I/J/K~U/X/Y/Z 列 ═══
def write_sales_sheets(token, all_entries, phone_map, db_info):
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
for sid, sname, _ in SALES_SHEETS:
entries = all_entries[sid]
log(f" 写入 {sname} ({sid})...")
# 按连续行分组
groups = []
cur_grp = []
for e in entries:
if not cur_grp or e["row"] == cur_grp[-1]["row"] + 1:
cur_grp.append(e)
else:
groups.append(cur_grp)
cur_grp = [e]
if cur_grp:
groups.append(cur_grp)
for g in groups:
sr, er = g[0]["row"], g[-1]["row"]
d_vals, h_vals, i_vals, j_vals = [], [], [], []
k_vals, l_vals, m_vals, n_vals = [], [], [], []
o_vals, p_vals, q_vals, r_vals = [], [], [], []
s_vals, t_vals, u_vals = [], [], []
x_vals, y_vals, z_vals = [], [], []
for e in g:
phone = e["phone"]
existing = e["existing"]
clue_date = e["clue_date_parsed"]
# 确定 UID
aid = 0
uid_str = ""
if re.match(r'^\d{11}$', phone) and phone in phone_map:
uid_str = phone_map[phone]
aid = int(uid_str)
elif existing["H"] and existing["H"].isdigit() and int(existing["H"]) > 0:
uid_str = existing["H"]
aid = int(existing["H"])
# H: UID — XXTEA 匹配到就写,否则留空
if re.match(r'^\d{11}$', phone) and phone in phone_map:
h_vals.append([phone_map[phone]])
elif re.match(r'^\d{11}$', phone):
h_vals.append([""])
elif existing["H"] and existing["H"].isdigit():
h_vals.append([existing["H"]])
else:
h_vals.append([""])
if aid > 0 and aid in db_info:
di = db_info[aid]
# D: 体验节数 — 只补空
if existing["D"]:
d_vals.append([existing["D"]])
else:
tc = di["trial_count"]
d_vals.append([tc if tc > 0 else ""])
# I: 注册日 — 只补空
if existing["I"]:
i_vals.append([existing["I"]])
else:
i_vals.append([di["reg_date"]])
# J: 下载渠道 — 只补空
if existing["J"]:
j_vals.append([existing["J"]])
else:
j_vals.append([di["download_channel"]])
# 全额退清判定
gmv_int = int(di["gmv"])
refund_int = int(di["refund"])
gsv_int = int(di["gsv"])
is_full_refund = (gmv_int > 0 and gmv_int == refund_int)
# 有效订单判定: 有订单 且 非全额退清 且 K(下单日) >= C(线索日期)
order_date = di["order_date"]
should_y_yes = di["has_order"] and not is_full_refund
# 日期比较: K >= C (用 raw 格式比较)
order_date_raw = di.get("order_date_raw", "")
if should_y_yes and clue_date and order_date_raw:
if order_date_raw < clue_date:
should_y_yes = False
if is_full_refund:
n_vals.append([""])
o_vals.append([""])
p_vals.append([""])
else:
n_vals.append([gmv_int if gmv_int > 0 else ""])
o_vals.append([refund_int if refund_int > 0 else ""])
p_vals.append([gsv_int if gsv_int > 0 else ""])
# K: 下单日期, L: 成交渠道, M: 产品
k_vals.append([order_date])
l_vals.append([di["order_channel"]])
m_vals.append([di["product"] if di["has_order"] else ""])
# Q: 激活课程
act = di["activation"]
if act:
q_vals.append([f"{act}体验课" if act in ("A1", "A2") else act])
else:
q_vals.append([""])
# R: 行课进度, S: 最近行课时间, T: 学习时长
r_vals.append([di["lesson_progress"] if di["lesson_progress"] else ""])
s_vals.append([di["lesson_time"]])
lm = di["lesson_minutes"]
t_vals.append([lm if lm > 0 else ""])
# X: 订单号
x_vals.append([di.get("trade_no", "")])
# Y: 有效订单 — 每次全量重新判断
y_vals.append([1 if should_y_yes else ""])
# Z: 渠道归属(销转/直购/端内/达人)
z_vals.append([classify_channel(di["order_channel"])])
else:
for arr in [d_vals, i_vals, j_vals, k_vals, l_vals, m_vals, n_vals,
o_vals, p_vals, q_vals, r_vals, s_vals, t_vals,
x_vals, y_vals, z_vals]:
arr.append([""])
# U: 更新时间
u_vals.append([now_str])
cols = [
("D", d_vals), ("H", h_vals), ("I", i_vals), ("J", j_vals),
("K", k_vals), ("L", l_vals), ("M", m_vals), ("N", n_vals),
("O", o_vals), ("P", p_vals), ("Q", q_vals), ("R", r_vals),
("S", s_vals), ("T", t_vals), ("U", u_vals),
("X", x_vals), ("Y", y_vals), ("Z", z_vals),
]
for col_letter, vals in cols:
put_values(token, sid, f"{col_letter}{sr}:{col_letter}{er}", vals)
time.sleep(0.1)
log(f" {sname}: {len(entries)} 行写入完成")
# ═══ Step 5: 汇总到「订单汇总」sheet ═══
def clear_summary_sheet(token):
"""先清空订单汇总 sheet 的旧数据A~X列从第3行开始再写入新数据。"""
log(" 检查订单汇总 sheet 现有数据...")
try:
rows = read_sheet(token, SUMMARY_SHEET_ID, "A3:A5000")
last_data_row = 2
for i, row in enumerate(rows):
if row and any(cell for cell in row if cell):
last_data_row = 3 + i
if last_data_row < 3:
log(" 订单汇总 sheet 无旧数据,跳过清空")
return
log(f" 清空 A3:X{last_data_row}{last_data_row - 2} 行旧数据)...")
writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token)
writer.clear(SUMMARY_SHEET_ID, start_row=3, end_row=last_data_row, cols=24)
log(" 清空完成")
except Exception as e:
log(f" 清空异常: {e}")
def write_summary_sheet(token, all_entries, phone_map, db_info):
"""
将三个销售 sheet 中 Y=1有效订单的行汇总到「订单汇总」sheet。
先清空旧数据,再全量写入。
订单汇总 sheet 的列结构A~X, 24列:
A~U: 镜像三表, V: 渠道归属(Z), W: 留空, X: 订单号
"""
# 先清空旧数据
clear_summary_sheet(token)
log(" 汇总订单数据...")
# 收集所有有效订单的行
summary_rows = []
for sid, sname, _ in SALES_SHEETS:
entries = all_entries[sid]
for e in entries:
phone = e["phone"]
existing = e["existing"]
# 确定 UID 和 db_info
aid = 0
if re.match(r'^\d{11}$', phone) and phone in phone_map:
aid = int(phone_map[phone])
elif existing["H"] and existing["H"].isdigit() and int(existing["H"]) > 0:
aid = int(existing["H"])
di = db_info.get(aid, {}) if aid > 0 else {}
# 判断是否有效订单: GSV>0, GMV>0, 非全额退, 有手机/UID, 进线早于下单
gmv_int = int(di.get("gmv", 0))
refund_int = int(di.get("refund", 0))
gsv_int = int(di.get("gsv", 0))
is_full_refund = (gmv_int > 0 and gmv_int == refund_int)
has_order = (di.get("has_order", False) and gmv_int > 0 and gsv_int > 0
and not is_full_refund)
# 进线早于下单: clue_date <= order_date (精确到秒)
order_date_raw = di.get("order_date_raw", "")
clue_date = e["clue_date_parsed"]
if has_order and clue_date and order_date_raw:
if order_date_raw < clue_date:
has_order = False
if not has_order:
continue
trade_no = di.get("trade_no", "")
# 构建汇总行 (A~X, 24列): A-U镜像 + V=渠道归属 + W=空 + X=订单号
row_data = [
e["sales"], # A: 销售归属
e["nickname"], # B: 微信昵称
e["clue_date"], # C: 进线日期
di.get("trial_count", 0) or "", # D: 体验节数
phone, # E: 手机号
e["grade"], # F: 用户年级
e["history"], # G: 课史/跟进
str(aid) if aid > 0 else "", # H: 用户ID
di.get("reg_date", ""), # I: 注册日期
di.get("download_channel", ""), # J: 下载渠道
di.get("order_date", ""), # K: 下单日期
di.get("order_channel", ""), # L: 成交渠道
di.get("product", ""), # M: 产品
gmv_int if gmv_int > 0 else "", # N: GMV
refund_int if refund_int > 0 else "", # O: 退款金额
gsv_int if gsv_int > 0 else "", # P: GSV
(f"{di['activation']}体验课" if di.get("activation") in ("A1", "A2") else di.get("activation", "")), # Q: 激活课程
di.get("lesson_progress", ""), # R: 行课进度
di.get("lesson_time", ""), # S: 最近行课时间
di.get("lesson_minutes", 0) or "", # T: 学习时长
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # U: 更新时间
classify_channel(di.get("order_channel", "")), # V: 渠道归属
"", # W: 留空
trade_no, # X: 订单号
]
summary_rows.append((trade_no, row_data))
# 同订单号去重(保留第一次出现)
seen_trade = set()
deduped = []
for trade_no, row_data in summary_rows:
if trade_no and trade_no in seen_trade:
continue
if trade_no:
seen_trade.add(trade_no)
deduped.append(row_data)
log(f"{len(summary_rows)} 条有效订单, 去重后 {len(deduped)} 条, 唯一订单号 {len(seen_trade)}")
if not deduped:
log(" 无有效订单,跳过汇总")
return
# 写入订单汇总 sheet从第3行开始A~X 共24列
writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token)
# 构建 A~X 的值数组24列确保每行长度一致
values = []
for row_data in deduped:
padded = row_data[:24]
while len(padded) < 24:
padded.append("")
values.append(padded)
writer.write(SUMMARY_SHEET_ID, start_row=3, rows=values, cols=24)
log(f" 订单汇总写入完成, 共 {len(summary_rows)}")
# ═══ Main ═══
def main():
log("=" * 60)
log("销售线索全量刷新 (XXTEA精确匹配版) 启动")
try:
token = get_fs_token()
log("Step 1: 解析销售三表")
all_entries = parse_sales_sheets(token)
log("Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配")
phone_map = phone_to_uid_xxtea(all_entries)
log("Step 3: PostgreSQL 批量查询")
db_info = query_all_pg(all_entries, phone_map)
log("Step 4: 写入销售三表 H~V 列")
write_sales_sheets(token, all_entries, phone_map, db_info)
log("Step 5: 汇总到「订单汇总」sheet")
write_summary_sheet(token, all_entries, phone_map, db_info)
log("✅ 全量刷新完成")
return 0
except Exception as e:
log(f"❌ ERROR: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())