904 lines
33 KiB
Python
904 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
销售线索全量刷新脚本 — XXTEA 精确匹配版 · UID 口径 v2
|
||
|
||
功能:
|
||
1. 读取「小龙」「吴迪」「成都」三个 sheet 的 E 列手机号
|
||
2. XXTEA 加密 → bi_vala_app_account.tel_encrypt 精确匹配 → 获取 account_id
|
||
3. 查询 PostgreSQL 获取用户订单/学习数据
|
||
4. 填写 D/H/I/J/K~U/X/Y 列(A-Y 共25列),U 列为操作更新时间
|
||
5. 将三个 sheet 中 X=1(有效订单)的用户按 UID 聚合汇总到「订单汇总」sheet
|
||
|
||
规则(2026-06-18 列契约 v2):
|
||
① E→H: XXTEA 精确匹配, 查不到留空
|
||
② H→D/I/J: 只补空, 不覆盖已有值
|
||
③ X=1: GSV>0 且非全额退且 K≥C
|
||
④ 全额退清: 所有订单都退费 → X=0, K-P 清空
|
||
⑤ N/O/P 0 留空, O 整元
|
||
⑥ G 列不动
|
||
⑦ V/W 列不覆盖(Cursor 维护)
|
||
⑧ X=有效0/1, Y=渠道归属, 不写订单号
|
||
|
||
用法:
|
||
python3 scripts/sales_leads_full_refresh.py
|
||
"""
|
||
|
||
import json, re, time, sys, os, requests, psycopg2
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
from feishu_sheet_utils import FeishuSheetWriter
|
||
|
||
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
WORKSPACE = os.path.dirname(SCRIPTS_DIR)
|
||
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
|
||
|
||
sys.path.insert(0, SCRIPTS_DIR)
|
||
from phone_encrypt import encrypt_phone
|
||
|
||
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
|
||
|
||
SALES_SHEETS = [
|
||
("qJF4I", "小龙", "A3:Y2607"),
|
||
("f975f0", "吴迪", "A3:Y8149"),
|
||
("qJF4J", "成都", "A3:Y2500"),
|
||
]
|
||
|
||
SUMMARY_SHEET_ID = "2smjwA"
|
||
|
||
CS_MAP = {"吴迪": "吴迪", "小龙": "小龙", "Tom": "Tom", "Bob": "Bob"}
|
||
|
||
GOODS_NAMES = {
|
||
57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季",
|
||
31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包",
|
||
61: "瓦拉英语level1+2",
|
||
}
|
||
|
||
CHANNEL_MAP = {
|
||
"Apple App Store": "苹果", "科大讯飞学习机": "讯飞", "学而思学习机": "学而思",
|
||
"华为应用市场": "华为", "小米应用市场": "小米", "应用宝应用市场": "应用宝",
|
||
"希沃学习机": "希沃", "荣耀应用市场": "荣耀", "小度学习机": "小度",
|
||
"oppo应用市场": "OPPO", "vivo应用市场": "VIVO", "京东方学习机": "京东方",
|
||
"步步高学习机": "步步高", "作业帮学习机": "作业帮", "魅族应用市场": "魅族",
|
||
"官网": "官网",
|
||
}
|
||
|
||
|
||
# Z列渠道归属分类规则 [王虹茗确认 2026-06-15]
|
||
def classify_channel(key_from):
|
||
"""将 key_from 归类为: 端内 / 销转 / 达人 / 直购"""
|
||
if not key_from:
|
||
return "直购"
|
||
kf = key_from.strip()
|
||
if kf in ("app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0"):
|
||
return "端内"
|
||
if kf.startswith("sales-adp-"):
|
||
return "销转"
|
||
if kf.startswith("newmedia-daren-") or kf == "newmedia-dianpu-wwxx-0-0":
|
||
return "达人"
|
||
# 其余: dianpu(不含wwxx) + partner/stream/miniprogram/jingxuan/空/shuadan等 → 直购
|
||
return "直购"
|
||
|
||
LOG_FILE = "/var/log/xiaoxi_full_refresh.log"
|
||
|
||
|
||
def log(msg):
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
line = f"[{ts}] {msg}"
|
||
print(line)
|
||
with open(LOG_FILE, "a") as f:
|
||
f.write(line + "\n")
|
||
|
||
|
||
def get_secret(key):
|
||
with open(os.path.join(WORKSPACE, "secrets.env")) as f:
|
||
for line in f:
|
||
if line.startswith(f"{key}="):
|
||
return line.strip().split("=", 1)[1].strip("'\"")
|
||
|
||
|
||
def get_fs_token():
|
||
with open(os.path.join(CRED_DIR, "config.json")) as f:
|
||
cfg = json.load(f)
|
||
resp = requests.post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
|
||
timeout=15
|
||
)
|
||
return resp.json()["tenant_access_token"]
|
||
|
||
|
||
def read_sheet(token, sheet_id, range_str=None):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
|
||
if range_str:
|
||
url += f"!{range_str}"
|
||
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
|
||
data = resp.json()
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(f"读取失败 {sheet_id}: {data}")
|
||
return data["data"]["valueRange"]["values"]
|
||
|
||
|
||
def put_values(token, sheet_id, range_str, values):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
|
||
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
|
||
resp = requests.put(url, headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json"
|
||
}, json=body, timeout=30)
|
||
r = resp.json()
|
||
if r.get("code") != 0:
|
||
log(f" ❌ {range_str}: code={r.get('code')} msg={r.get('msg')}")
|
||
return False
|
||
return True
|
||
|
||
|
||
def batch_in(cur, sql_tpl, params, chunk=500):
|
||
results = []
|
||
for i in range(0, len(params), chunk):
|
||
batch = params[i:i + chunk]
|
||
ph = ",".join(["%s"] * len(batch))
|
||
cur.execute(sql_tpl % ph, batch)
|
||
results.extend(cur.fetchall())
|
||
return results
|
||
|
||
|
||
def safe_cell(row, idx):
|
||
"""安全获取单元格值,数字转整数字符串"""
|
||
if len(row) > idx and row[idx] is not None:
|
||
try:
|
||
if isinstance(row[idx], (int, float)):
|
||
if row[idx] == int(row[idx]):
|
||
return str(int(row[idx]))
|
||
return str(row[idx]).strip()
|
||
except (ValueError, TypeError):
|
||
return str(row[idx]).strip()
|
||
return ""
|
||
|
||
|
||
def parse_date_str(s):
|
||
"""'6月7日'/'6月7日 10:23:48' → '2026-06-07'/'2026-06-07 10:23:48', YYYY-MM-DD 原样返回"""
|
||
if not s:
|
||
return ""
|
||
s = s.strip()
|
||
if re.match(r'^\d{4}-\d{2}-\d{2}', s):
|
||
return s
|
||
# 提取日期+可选时间: '6月7日 10:23:48' 或 '6月7日'
|
||
m = re.match(r'^(\d{1,2})月(\d{1,2})日(?:\s+(\d{1,2}:\d{2}:\d{2}))?', s)
|
||
if m:
|
||
year = datetime.now().year
|
||
date_part = f"{year}-{int(m.group(1)):02d}-{int(m.group(2)):02d}"
|
||
if m.group(3):
|
||
return f"{date_part} {m.group(3)}"
|
||
return date_part
|
||
return s
|
||
|
||
|
||
# ═══ Step 1: 解析三个销售 sheet ═══
|
||
|
||
def parse_sales_sheets(token):
|
||
all_data = {}
|
||
for sid, sname, rng in SALES_SHEETS:
|
||
rows = read_sheet(token, sid, rng)
|
||
entries = []
|
||
for idx, row in enumerate(rows):
|
||
row_num = idx + 3
|
||
if not row or all(not cell for cell in row):
|
||
continue
|
||
|
||
a_val = safe_cell(row, 0)
|
||
sales = None
|
||
for k, v in CS_MAP.items():
|
||
if k in a_val:
|
||
sales = v
|
||
break
|
||
if not sales:
|
||
continue
|
||
|
||
phone = ""
|
||
if len(row) > 4 and row[4]:
|
||
try:
|
||
phone = str(int(float(str(row[4]))))
|
||
except (ValueError, TypeError):
|
||
phone = str(row[4]).strip()
|
||
|
||
entries.append({
|
||
"row": row_num,
|
||
"sales": sales,
|
||
"nickname": safe_cell(row, 1),
|
||
"clue_date": safe_cell(row, 2),
|
||
"clue_date_parsed": parse_date_str(safe_cell(row, 2)),
|
||
"phone": phone,
|
||
"grade": safe_cell(row, 5),
|
||
"history": safe_cell(row, 6),
|
||
"existing": {
|
||
"D": safe_cell(row, 3),
|
||
"H": safe_cell(row, 7),
|
||
"I": safe_cell(row, 8),
|
||
"J": safe_cell(row, 9),
|
||
"K": safe_cell(row, 10),
|
||
"L": safe_cell(row, 11),
|
||
"M": safe_cell(row, 12),
|
||
"N": safe_cell(row, 13),
|
||
"O": safe_cell(row, 14),
|
||
"P": safe_cell(row, 15),
|
||
"Q": safe_cell(row, 16),
|
||
"R": safe_cell(row, 17),
|
||
"S": safe_cell(row, 18),
|
||
"T": safe_cell(row, 19),
|
||
"U": safe_cell(row, 20),
|
||
"X": safe_cell(row, 23),
|
||
"Y": safe_cell(row, 24),
|
||
},
|
||
})
|
||
|
||
all_data[sid] = entries
|
||
phone_cnt = sum(1 for e in entries if re.match(r'^\d{11}$', e["phone"]))
|
||
uid_cnt = sum(1 for e in entries if e["existing"]["H"] and e["existing"]["H"].isdigit())
|
||
log(f" [{sname}] {len(entries)}行, 手机号{phone_cnt}, 已有UID{uid_cnt}")
|
||
|
||
return all_data
|
||
|
||
|
||
# ═══ Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配 ═══
|
||
|
||
def phone_to_uid_xxtea(all_entries):
|
||
phone_set = set()
|
||
for entries in all_entries.values():
|
||
for e in entries:
|
||
if re.match(r'^\d{11}$', e["phone"]):
|
||
phone_set.add(e["phone"])
|
||
|
||
if not phone_set:
|
||
log(" 无有效手机号")
|
||
return {}
|
||
|
||
log(f" XXTEA 加密匹配: {len(phone_set)} 个唯一手机号")
|
||
|
||
phone_enc_map = {}
|
||
for phone in phone_set:
|
||
try:
|
||
phone_enc_map[encrypt_phone(phone)] = phone
|
||
except Exception as ex:
|
||
log(f" 加密失败 {phone}: {ex}")
|
||
|
||
log(f" 加密完成, 唯一密文: {len(phone_enc_map)}")
|
||
|
||
conn = psycopg2.connect(
|
||
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
|
||
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
|
||
dbname="vala_bi", connect_timeout=30
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
enc_list = list(phone_enc_map.keys())
|
||
phone_to_uid = {}
|
||
for i in range(0, len(enc_list), 500):
|
||
chunk = enc_list[i:i + 500]
|
||
ph = ",".join(["%s"] * len(chunk))
|
||
cur.execute(
|
||
f"SELECT id, tel_encrypt FROM bi_vala_app_account "
|
||
f"WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
|
||
chunk
|
||
)
|
||
for uid, tel_enc in cur.fetchall():
|
||
plain = phone_enc_map.get(tel_enc)
|
||
if plain:
|
||
phone_to_uid[plain] = str(uid)
|
||
time.sleep(0.05)
|
||
|
||
cur.close()
|
||
conn.close()
|
||
log(f" 精确匹配到 {len(phone_to_uid)} 个 UID (via XXTEA)")
|
||
return phone_to_uid
|
||
|
||
|
||
# ═══ Step 3: PostgreSQL 批量查询 ═══
|
||
|
||
def query_all_pg(all_entries, phone_map):
|
||
uid_set = set()
|
||
for entries in all_entries.values():
|
||
for e in entries:
|
||
if re.match(r'^\d{11}$', e["phone"]) and e["phone"] in phone_map:
|
||
uid_set.add(int(phone_map[e["phone"]]))
|
||
h_val = e["existing"]["H"]
|
||
if h_val and h_val.isdigit() and int(h_val) > 0:
|
||
uid_set.add(int(h_val))
|
||
|
||
uid_list = list(uid_set)
|
||
log(f" 有效 user_id: {len(uid_list)}")
|
||
|
||
if not uid_list:
|
||
return {}
|
||
|
||
conn = psycopg2.connect(
|
||
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
|
||
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
|
||
dbname="vala_bi", connect_timeout=30
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
info = {uid: {
|
||
"reg_date": "", "download_channel": "", "trial_count": 0,
|
||
"has_order": False, "orders": [],
|
||
"activation": "", "lesson_progress": "", "lesson_time": "", "lesson_minutes": 0,
|
||
} for uid in uid_set}
|
||
|
||
# 3a. 注册信息
|
||
log(" 查询注册信息...")
|
||
reg_info = batch_in(cur,
|
||
"SELECT id, created_at, download_channel FROM bi_vala_app_account "
|
||
"WHERE id IN (%s) AND status=1 AND deleted_at IS NULL",
|
||
uid_list
|
||
)
|
||
for aid, created_at, dc in reg_info:
|
||
if aid in info:
|
||
info[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else ""
|
||
raw_ch = dc or ""
|
||
info[aid]["download_channel"] = CHANNEL_MAP.get(raw_ch, raw_ch)
|
||
|
||
# 3b. 体验节数
|
||
log(" 查询体验节数...")
|
||
trial_info = batch_in(cur,
|
||
"SELECT account_id, COUNT(*) FROM bi_user_course_detail "
|
||
"WHERE account_id IN (%s) AND expire_time IS NULL AND deleted_at IS NULL "
|
||
"GROUP BY account_id",
|
||
uid_list
|
||
)
|
||
for aid, cnt in trial_info:
|
||
if aid in info:
|
||
info[aid]["trial_count"] = cnt
|
||
|
||
# 3c. 订单信息
|
||
log(" 查询订单信息...")
|
||
orders = batch_in(cur,
|
||
"SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status "
|
||
"FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL "
|
||
"AND order_status IN (3,4) ORDER BY pay_success_date DESC",
|
||
uid_list
|
||
)
|
||
user_orders = defaultdict(list)
|
||
for o in orders:
|
||
user_orders[o[0]].append(o)
|
||
|
||
trade_nos = [o[1] for o in orders if o[1]]
|
||
refund_map = {}
|
||
if trade_nos:
|
||
refunds = batch_in(cur,
|
||
"SELECT trade_no, refund_amount_int FROM bi_refund_order "
|
||
"WHERE trade_no IN (%s) AND status=3",
|
||
trade_nos
|
||
)
|
||
for tn, amt in refunds:
|
||
refund_map[tn] = amt
|
||
|
||
for aid, olist in user_orders.items():
|
||
if aid not in info:
|
||
continue
|
||
info[aid]["has_order"] = True
|
||
# 存储逐单数据(按 pay_success_date DESC),供后续选取有效单
|
||
orders_data = []
|
||
for o in olist:
|
||
trade_no = o[1] or ""
|
||
pay_dt = o[2]
|
||
key_from = o[3] or ""
|
||
goods_id = o[4]
|
||
pay_amount = o[5] / 100.0
|
||
refund_amount = refund_map.get(trade_no, 0) / 100.0
|
||
gsv = pay_amount - refund_amount
|
||
orders_data.append({
|
||
"trade_no": trade_no,
|
||
"pay_dt": pay_dt,
|
||
"pay_dt_raw": pay_dt.strftime("%Y-%m-%d %H:%M:%S") if pay_dt else "",
|
||
"key_from": key_from,
|
||
"goods_id": goods_id,
|
||
"product": GOODS_NAMES.get(goods_id, f"商品{goods_id}"),
|
||
"pay_amount": pay_amount,
|
||
"refund_amount": refund_amount,
|
||
"gsv": gsv,
|
||
})
|
||
info[aid]["orders"] = orders_data
|
||
|
||
# 3d. 激活课程
|
||
log(" 查询激活课程...")
|
||
try:
|
||
activations = batch_in(cur,
|
||
"SELECT account_id, season_package_level FROM bi_vala_seasonal_ticket "
|
||
"WHERE account_id IN (%s) AND status=1 AND deleted_at IS NULL "
|
||
"AND season_package_level IN ('A1','A2')",
|
||
uid_list
|
||
)
|
||
for aid, lvl in activations:
|
||
if aid in info:
|
||
info[aid]["activation"] = lvl
|
||
except Exception as ex:
|
||
log(f" 激活查询异常: {ex}")
|
||
|
||
# 3e. 角色信息
|
||
log(" 查询角色信息...")
|
||
char_info = batch_in(cur,
|
||
"SELECT account_id, id FROM bi_vala_app_character "
|
||
"WHERE account_id IN (%s) AND deleted_at IS NULL",
|
||
uid_list
|
||
)
|
||
account_chars = defaultdict(list)
|
||
char_to_account = {}
|
||
for aid, cid in char_info:
|
||
account_chars[aid].append(cid)
|
||
char_to_account[cid] = aid
|
||
char_ids = list(char_to_account.keys())
|
||
log(f" 角色数: {len(char_ids)}")
|
||
|
||
# 3f. 课程映射
|
||
cur.execute("SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson")
|
||
chapter_map = {}
|
||
for ch_id, cl, cs, cu, cl2 in cur.fetchall():
|
||
chapter_map[ch_id] = (cl or "", cs or "", cu or "", cl2 or "")
|
||
|
||
# 3g. 课时完成记录
|
||
log(" 查询课时完成记录...")
|
||
char_plays = defaultdict(lambda: {"latest_time": None, "latest_chapter": None, "total_ms": 0})
|
||
for tbl_idx in range(8):
|
||
table = f"bi_user_chapter_play_record_{tbl_idx}"
|
||
try:
|
||
cur.execute(
|
||
f"SELECT user_id, chapter_id, created_at FROM {table} "
|
||
f"WHERE play_status=1 AND deleted_at IS NULL AND user_id = ANY(%s)",
|
||
(char_ids,)
|
||
)
|
||
for uid, ch_id, created_at in cur.fetchall():
|
||
ch_data = chapter_map.get(ch_id)
|
||
if not ch_data:
|
||
continue
|
||
rec = char_plays[uid]
|
||
if rec["latest_time"] is None or created_at > rec["latest_time"]:
|
||
rec["latest_time"] = created_at
|
||
rec["latest_chapter"] = (ch_id, ch_data)
|
||
except Exception as ex:
|
||
log(f" 警告 {table}: {ex}")
|
||
|
||
# 3h. 学习总耗时
|
||
log(" 查询学习耗时...")
|
||
for tbl_idx in range(8):
|
||
table = f"bi_user_component_play_record_{tbl_idx}"
|
||
try:
|
||
cur.execute(
|
||
f"SELECT user_id, SUM(COALESCE(interval_time,0)) FROM {table} "
|
||
f"WHERE user_id = ANY(%s) AND deleted_at IS NULL GROUP BY user_id",
|
||
(char_ids,)
|
||
)
|
||
for uid, total_ms in cur.fetchall():
|
||
if uid in char_plays:
|
||
char_plays[uid]["total_ms"] += (total_ms or 0)
|
||
except Exception as ex:
|
||
log(f" 警告 {table}: {ex}")
|
||
|
||
cur.close()
|
||
conn.close()
|
||
|
||
# 汇总到 account 级别
|
||
for aid in uid_set:
|
||
chars = account_chars.get(aid, [])
|
||
best_time = None
|
||
best_ch = None
|
||
total_ms = 0
|
||
for cid in chars:
|
||
play = char_plays.get(cid)
|
||
if not play:
|
||
continue
|
||
if play["latest_chapter"]:
|
||
if best_time is None or play["latest_time"] > best_time:
|
||
best_time = play["latest_time"]
|
||
best_ch = play["latest_chapter"]
|
||
total_ms += play["total_ms"]
|
||
|
||
info[aid]["lesson_minutes"] = round(total_ms / 60000, 1)
|
||
if info[aid]["lesson_minutes"] == int(info[aid]["lesson_minutes"]):
|
||
info[aid]["lesson_minutes"] = int(info[aid]["lesson_minutes"])
|
||
|
||
if best_ch:
|
||
ch_id, (cl, cs, cu, cl2) = best_ch
|
||
info[aid]["lesson_progress"] = f"{cl}-{cs}-{cu}-{cl2}"
|
||
info[aid]["lesson_time"] = best_time.strftime("%Y-%m-%d") if best_time else ""
|
||
|
||
log(f" 数据库查询完成")
|
||
return info
|
||
|
||
|
||
# ═══ Step 4: 写入销售三表 D/H/I/J/K~U/X/Y/Z 列 ═══
|
||
|
||
def pick_valid_order(orders, clue_date):
|
||
"""
|
||
从订单列表中选取有效主单。
|
||
规则: GSV>0 · 非全额退 · K≥C
|
||
返回: (order_dict, is_valid) 或 (None, False)
|
||
"""
|
||
if not orders:
|
||
return None, False
|
||
valid = []
|
||
for o in orders:
|
||
gsv = o["gsv"]
|
||
pay_amount = o["pay_amount"]
|
||
refund_amount = o["refund_amount"]
|
||
is_full_refund = (pay_amount > 0 and pay_amount == refund_amount)
|
||
if gsv <= 0 or is_full_refund:
|
||
continue
|
||
pay_dt = o["pay_dt"]
|
||
if pay_dt and clue_date:
|
||
if pay_dt.strftime("%Y-%m-%d %H:%M:%S") < clue_date:
|
||
continue
|
||
valid.append(o)
|
||
if not valid:
|
||
return None, False
|
||
# 取最新一笔有效单
|
||
valid.sort(key=lambda o: o["pay_dt"] or datetime.min, reverse=True)
|
||
return valid[0], True
|
||
|
||
|
||
def aggregate_valid_orders(orders, clue_date):
|
||
"""
|
||
聚合所有有效订单:GSV/GMV/退款 累加,订单号取未退款那笔。
|
||
规则: GSV>0 · 非全额退 · K≥C
|
||
返回: dict 含 aggregated_gsv/gmv/refund, best_trade_no, latest_order
|
||
或 (None, False)
|
||
"""
|
||
if not orders:
|
||
return None, False
|
||
valid = []
|
||
for o in orders:
|
||
gsv = o["gsv"]
|
||
pay_amount = o["pay_amount"]
|
||
refund_amount = o["refund_amount"]
|
||
is_full_refund = (pay_amount > 0 and pay_amount == refund_amount)
|
||
if gsv <= 0 or is_full_refund:
|
||
continue
|
||
pay_dt = o["pay_dt"]
|
||
if pay_dt and clue_date:
|
||
if pay_dt.strftime("%Y-%m-%d %H:%M:%S") < clue_date:
|
||
continue
|
||
valid.append(o)
|
||
if not valid:
|
||
return None, False
|
||
|
||
# 按时间排序
|
||
valid.sort(key=lambda o: o["pay_dt"] or datetime.min, reverse=True)
|
||
|
||
# GSV/GMV/退款 累加
|
||
total_gsv = sum(o["gsv"] for o in valid)
|
||
total_gmv = sum(o["pay_amount"] for o in valid)
|
||
total_refund = sum(o["refund_amount"] for o in valid)
|
||
|
||
# 订单号:优先取未退款的(refund_amount==0),多笔取最新
|
||
no_refund = [o for o in valid if o["refund_amount"] == 0]
|
||
if no_refund:
|
||
best_trade_no = no_refund[0]["trade_no"]
|
||
else:
|
||
# 全部有退款,取退款最少的那笔
|
||
valid_by_refund = sorted(valid, key=lambda o: o["refund_amount"])
|
||
best_trade_no = valid_by_refund[0]["trade_no"]
|
||
|
||
# 最新订单信息用于 K/L/M 列
|
||
latest = valid[0]
|
||
|
||
# 产品:去重拼接
|
||
products = []
|
||
seen_p = set()
|
||
for o in valid:
|
||
p = o.get("product", "")
|
||
if p and p not in seen_p:
|
||
products.append(p)
|
||
seen_p.add(p)
|
||
|
||
return {
|
||
"aggregated_gsv": total_gsv,
|
||
"aggregated_gmv": total_gmv,
|
||
"aggregated_refund": total_refund,
|
||
"best_trade_no": best_trade_no,
|
||
"latest_order": latest,
|
||
"products": "+".join(products) if products else latest.get("product", ""),
|
||
"valid_count": len(valid),
|
||
}, True
|
||
|
||
|
||
def write_sales_sheets(token, all_entries, phone_map, db_info):
|
||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
for sid, sname, _ in SALES_SHEETS:
|
||
entries = all_entries[sid]
|
||
log(f" 写入 {sname} ({sid})...")
|
||
|
||
# 按连续行分组
|
||
groups = []
|
||
cur_grp = []
|
||
for e in entries:
|
||
if not cur_grp or e["row"] == cur_grp[-1]["row"] + 1:
|
||
cur_grp.append(e)
|
||
else:
|
||
groups.append(cur_grp)
|
||
cur_grp = [e]
|
||
if cur_grp:
|
||
groups.append(cur_grp)
|
||
|
||
for g in groups:
|
||
sr, er = g[0]["row"], g[-1]["row"]
|
||
|
||
d_vals, h_vals, i_vals, j_vals = [], [], [], []
|
||
k_vals, l_vals, m_vals, n_vals = [], [], [], []
|
||
o_vals, p_vals, q_vals, r_vals = [], [], [], []
|
||
s_vals, t_vals, u_vals = [], [], []
|
||
x_vals, y_vals = [], []
|
||
|
||
for e in g:
|
||
phone = e["phone"]
|
||
existing = e["existing"]
|
||
clue_date = e["clue_date_parsed"]
|
||
|
||
# 确定 UID
|
||
aid = 0
|
||
uid_str = ""
|
||
if re.match(r'^\d{11}$', phone) and phone in phone_map:
|
||
uid_str = phone_map[phone]
|
||
aid = int(uid_str)
|
||
elif existing["H"] and existing["H"].isdigit() and int(existing["H"]) > 0:
|
||
uid_str = existing["H"]
|
||
aid = int(existing["H"])
|
||
|
||
# H: UID — XXTEA 匹配到就写,否则留空
|
||
if re.match(r'^\d{11}$', phone) and phone in phone_map:
|
||
h_vals.append([phone_map[phone]])
|
||
elif re.match(r'^\d{11}$', phone):
|
||
h_vals.append([""])
|
||
elif existing["H"] and existing["H"].isdigit():
|
||
h_vals.append([existing["H"]])
|
||
else:
|
||
h_vals.append([""])
|
||
|
||
if aid > 0 and aid in db_info:
|
||
di = db_info[aid]
|
||
|
||
# D: 体验节数 — 只补空
|
||
if existing["D"]:
|
||
d_vals.append([existing["D"]])
|
||
else:
|
||
tc = di["trial_count"]
|
||
d_vals.append([tc if tc > 0 else ""])
|
||
|
||
# I: 注册日 — 只补空
|
||
if existing["I"]:
|
||
i_vals.append([existing["I"]])
|
||
else:
|
||
i_vals.append([di["reg_date"]])
|
||
|
||
# J: 下载渠道 — 只补空
|
||
if existing["J"]:
|
||
j_vals.append([existing["J"]])
|
||
else:
|
||
j_vals.append([di["download_channel"]])
|
||
|
||
# 选取有效主单
|
||
orders = di.get("orders", [])
|
||
valid_order, is_valid = pick_valid_order(orders, clue_date)
|
||
|
||
if is_valid and valid_order:
|
||
# Y=1: K/L/M/N/O/P 全写该有效单真实值
|
||
pay_dt = valid_order["pay_dt"]
|
||
order_date = f"{pay_dt.month}月{pay_dt.day}日 {pay_dt.strftime('%H:%M:%S')}" if pay_dt else ""
|
||
k_vals.append([order_date])
|
||
l_vals.append([valid_order["key_from"]])
|
||
m_vals.append([valid_order["product"]])
|
||
n_vals.append([int(valid_order["pay_amount"]) if valid_order["pay_amount"] > 0 else ""])
|
||
o_vals.append([int(valid_order["refund_amount"]) if valid_order["refund_amount"] > 0 else ""])
|
||
p_vals.append([int(valid_order["gsv"]) if valid_order["gsv"] > 0 else ""])
|
||
x_vals.append([1]) # X=有效0/1
|
||
y_vals.append([classify_channel(valid_order["key_from"])]) # Y=渠道
|
||
elif di["has_order"]:
|
||
# 有订单但无有效单 → X=0, K/L 留空
|
||
k_vals.append([""])
|
||
l_vals.append([""])
|
||
m_vals.append([""])
|
||
n_vals.append([""])
|
||
o_vals.append([""])
|
||
p_vals.append([""])
|
||
x_vals.append([0])
|
||
y_vals.append([""])
|
||
else:
|
||
k_vals.append([""])
|
||
l_vals.append([""])
|
||
m_vals.append([""])
|
||
n_vals.append([""])
|
||
o_vals.append([""])
|
||
p_vals.append([""])
|
||
x_vals.append([""])
|
||
y_vals.append([""])
|
||
|
||
# Q: 激活课程
|
||
act = di["activation"]
|
||
if act:
|
||
q_vals.append([f"{act}体验课" if act in ("A1", "A2") else act])
|
||
else:
|
||
q_vals.append([""])
|
||
|
||
# R: 行课进度, S: 最近行课时间, T: 学习时长
|
||
r_vals.append([di["lesson_progress"] if di["lesson_progress"] else ""])
|
||
s_vals.append([di["lesson_time"]])
|
||
lm = di["lesson_minutes"]
|
||
t_vals.append([lm if lm > 0 else ""])
|
||
else:
|
||
for arr in [d_vals, i_vals, j_vals, k_vals, l_vals, m_vals, n_vals,
|
||
o_vals, p_vals, q_vals, r_vals, s_vals, t_vals,
|
||
x_vals, y_vals]:
|
||
arr.append([""])
|
||
|
||
# U: 更新时间
|
||
u_vals.append([now_str])
|
||
|
||
cols = [
|
||
("D", d_vals), ("H", h_vals), ("I", i_vals), ("J", j_vals),
|
||
("K", k_vals), ("L", l_vals), ("M", m_vals), ("N", n_vals),
|
||
("O", o_vals), ("P", p_vals), ("Q", q_vals), ("R", r_vals),
|
||
("S", s_vals), ("T", t_vals), ("U", u_vals),
|
||
("X", x_vals), ("Y", y_vals),
|
||
]
|
||
for col_letter, vals in cols:
|
||
put_values(token, sid, f"{col_letter}{sr}:{col_letter}{er}", vals)
|
||
time.sleep(0.1)
|
||
|
||
log(f" {sname}: {len(entries)} 行写入完成")
|
||
|
||
|
||
# ═══ Step 5: 汇总到「订单汇总」sheet ═══
|
||
|
||
def clear_summary_sheet(token):
|
||
"""先清空订单汇总 sheet 的旧数据(A~W列,从第3行开始),再写入新数据。"""
|
||
log(" 检查订单汇总 sheet 现有数据...")
|
||
try:
|
||
rows = read_sheet(token, SUMMARY_SHEET_ID, "A3:A5000")
|
||
last_data_row = 2
|
||
for i, row in enumerate(rows):
|
||
if row and any(cell for cell in row if cell):
|
||
last_data_row = 3 + i
|
||
|
||
if last_data_row < 3:
|
||
log(" 订单汇总 sheet 无旧数据,跳过清空")
|
||
return
|
||
|
||
log(f" 清空 A3:W{last_data_row}({last_data_row - 2} 行旧数据)...")
|
||
writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token)
|
||
writer.clear(SUMMARY_SHEET_ID, start_row=3, end_row=last_data_row, cols=23)
|
||
log(" 清空完成")
|
||
except Exception as e:
|
||
log(f" 清空异常: {e}")
|
||
|
||
|
||
def write_summary_sheet(token, all_entries, phone_map, db_info):
|
||
"""
|
||
订单汇总: 唯一真源 = 三表 X=1 gate 的 unique UID。
|
||
从 gate 行全量重建,A-U 镜像 gate 行,V=渠道,W 留空。
|
||
1 UID = 1 行,按 UID 聚合 N/O/P。
|
||
同 UID 多进线 → 只保留 1 行(行号最小)。
|
||
"""
|
||
clear_summary_sheet(token)
|
||
|
||
log(" 汇总订单数据(UID gate 全量重建)...")
|
||
|
||
# 从三表收集 X=1 行(gate 门禁)
|
||
gate_rows = []
|
||
for sid, sname, _ in SALES_SHEETS:
|
||
entries = all_entries[sid]
|
||
for e in entries:
|
||
phone = e["phone"]
|
||
clue_date = e["clue_date_parsed"]
|
||
existing = e["existing"]
|
||
|
||
aid = 0
|
||
if re.match(r'^\d{11}$', phone) and phone in phone_map:
|
||
aid = int(phone_map[phone])
|
||
elif existing["H"] and existing["H"].isdigit() and int(existing["H"]) > 0:
|
||
aid = int(existing["H"])
|
||
|
||
di = db_info.get(aid, {}) if aid > 0 else {}
|
||
orders = di.get("orders", [])
|
||
if not orders:
|
||
continue
|
||
|
||
# 用 aggregate_valid_orders 聚合所有有效订单
|
||
agg_result, is_valid = aggregate_valid_orders(orders, clue_date)
|
||
if not is_valid or not agg_result:
|
||
continue
|
||
|
||
latest = agg_result["latest_order"]
|
||
pay_dt = latest["pay_dt"]
|
||
order_date = f"{pay_dt.month}月{pay_dt.day}日 {pay_dt.strftime('%H:%M:%S')}" if pay_dt else ""
|
||
|
||
row_data = [
|
||
e["sales"], # A: 销售归属
|
||
e["nickname"], # B: 微信昵称
|
||
e["clue_date"], # C: 进线日期
|
||
di.get("trial_count", 0) or "", # D: 体验节数
|
||
phone, # E: 手机号
|
||
e["grade"], # F: 用户年级
|
||
e["history"], # G: 课史/跟进
|
||
str(aid) if aid > 0 else "", # H: 用户ID
|
||
di.get("reg_date", ""), # I: 注册日期
|
||
di.get("download_channel", ""), # J: 下载渠道
|
||
order_date, # K: 下单日期
|
||
latest["key_from"], # L: 成交渠道
|
||
agg_result["products"], # M: 产品(多单用+拼接)
|
||
int(agg_result["aggregated_gmv"]) if agg_result["aggregated_gmv"] > 0 else "", # N: GMV(聚合)
|
||
int(agg_result["aggregated_refund"]) if agg_result["aggregated_refund"] > 0 else "", # O: 退款(聚合)
|
||
int(agg_result["aggregated_gsv"]) if agg_result["aggregated_gsv"] > 0 else "", # P: GSV(聚合)
|
||
(f"{di['activation']}体验课" if di.get("activation") in ("A1", "A2") else di.get("activation", "")), # Q: 激活课程
|
||
di.get("lesson_progress", ""), # R: 行课进度
|
||
di.get("lesson_time", ""), # S: 最近行课时间
|
||
di.get("lesson_minutes", 0) or "", # T: 学习时长
|
||
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), # U: 更新时间
|
||
classify_channel(latest["key_from"]), # V: 渠道归属
|
||
"", # W: 留空(不再写 trade_no)
|
||
]
|
||
gate_rows.append((aid, e["row"], row_data))
|
||
|
||
# 同 UID 多进线 → 只保留行号最小的 1 行
|
||
seen_uid = {}
|
||
deduped = []
|
||
for uid, row_num, row_data in sorted(gate_rows, key=lambda x: x[1]):
|
||
if uid in seen_uid:
|
||
continue
|
||
seen_uid[uid] = True
|
||
deduped.append(row_data)
|
||
|
||
log(f" 共 {len(gate_rows)} 条 gate 行, 去重后 {len(deduped)} 行, 唯一 UID {len(seen_uid)}")
|
||
|
||
if not deduped:
|
||
log(" 无有效订单,跳过汇总")
|
||
return
|
||
|
||
# 写入订单汇总 sheet(从第3行开始,A~W 共23列)
|
||
writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token)
|
||
|
||
values = []
|
||
for row_data in deduped:
|
||
padded = row_data[:23]
|
||
while len(padded) < 23:
|
||
padded.append("")
|
||
values.append(padded)
|
||
|
||
writer.write(SUMMARY_SHEET_ID, start_row=3, rows=values, cols=23)
|
||
|
||
log(f" 订单汇总写入完成, 共 {len(deduped)} 行")
|
||
|
||
|
||
# ═══ Main ═══
|
||
|
||
def main():
|
||
log("=" * 60)
|
||
log("销售线索全量刷新 (XXTEA精确匹配版) 启动")
|
||
|
||
try:
|
||
token = get_fs_token()
|
||
|
||
log("Step 1: 解析销售三表")
|
||
all_entries = parse_sales_sheets(token)
|
||
|
||
log("Step 2: XXTEA 加密 → PG tel_encrypt 精确匹配")
|
||
phone_map = phone_to_uid_xxtea(all_entries)
|
||
|
||
log("Step 3: PostgreSQL 批量查询")
|
||
db_info = query_all_pg(all_entries, phone_map)
|
||
|
||
log("Step 4: 写入销售三表 H~V 列")
|
||
write_sales_sheets(token, all_entries, phone_map, db_info)
|
||
|
||
log("Step 5: 汇总到「订单汇总」sheet")
|
||
write_summary_sheet(token, all_entries, phone_map, db_info)
|
||
|
||
log("✅ 全量刷新完成")
|
||
return 0
|
||
except Exception as e:
|
||
log(f"❌ ERROR: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main())
|