ai_member_xiaoxi/scripts/finance_orders_refresh.py
2026-06-19 08:00:01 +08:00

396 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
财务口径订单刷新 v3 — 从订单汇总 2smjwA 出发
逻辑:
1. 读订单汇总 2smjwA (A3:W)
2. 逐行复制 AU 镜像段,按 E 列手机号查 DB → 该用户全部订单
3. N 单 = N 行:每行 X=trade_noKP 写该单金额
4. Y=该单有效→1 (GSV>0 · 非全额退 · 进线≤下单)
5. Z=渠道归属W=留空(Cursor公式)
6. 先 clear A3:Z5000 全删,再全量写入
"""
import json, re, time, sys, os, requests, psycopg2
from datetime import datetime
from collections import defaultdict
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
WORKSPACE = os.path.dirname(SCRIPTS_DIR)
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
from feishu_sheet_utils import FeishuSheetWriter
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
SUMMARY_SHEET_ID = "2smjwA"
FINANCE_SHEET_ID = "2hSLSg"
GOODS_NAMES = {
57: "瓦拉英语level1·单季", 60: "瓦拉英语level1", 63: "瓦拉英语level1·单季",
31: "瓦拉英语年包", 32: "瓦拉英语单季度包", 33: "瓦拉英语level2", 54: "瓦拉英语季度包",
61: "瓦拉英语level1+2",
}
def classify_channel(key_from):
if not key_from:
return "直购"
kf = key_from.strip()
if kf in ("app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0"):
return "端内"
if kf.startswith("sales-adp-"):
return "销转"
if kf.startswith("newmedia-daren-") or kf == "newmedia-dianpu-wwxx-0-0":
return "达人"
return "直购"
LOG_FILE = "/var/log/xiaoxi_finance_orders.log"
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_secret(key):
with open(os.path.join(WORKSPACE, "secrets.env")) as f:
for line in f:
if line.startswith(f"{key}="):
return line.strip().split("=", 1)[1].strip("'\"")
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_sheet(token, sheet_id, range_str=None):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
if range_str:
url += f"!{range_str}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"读取失败 {sheet_id}: {data}")
return data["data"]["valueRange"]["values"]
def safe_cell(row, idx):
if len(row) > idx and row[idx] is not None:
try:
if isinstance(row[idx], (int, float)):
if row[idx] == int(row[idx]):
return str(int(row[idx]))
return str(row[idx]).strip()
except (ValueError, TypeError):
return str(row[idx]).strip()
return ""
def batch_in(cur, sql_tpl, params, chunk=500):
results = []
for i in range(0, len(params), chunk):
batch = params[i:i + chunk]
ph = ",".join(["%s"] * len(batch))
cur.execute(sql_tpl % ph, batch)
results.extend(cur.fetchall())
return results
def parse_date_str(s):
"""'6月7日'/'6月7日 10:23:48''2026-06-07'/'2026-06-07 10:23:48'"""
if not s:
return ""
s = s.strip()
if re.match(r'^\d{4}-\d{2}-\d{2}', s):
return s
m = re.match(r'^(\d{1,2})月(\d{1,2})日(?:\s+(\d{1,2}:\d{2}:\d{2}))?', s)
if m:
year = datetime.now().year
date_part = f"{year}-{int(m.group(1)):02d}-{int(m.group(2)):02d}"
if m.group(3):
return f"{date_part} {m.group(3)}"
return date_part
return s
# ═══ Step 1: 读订单汇总 2smjwA ═══
def parse_summary(token):
"""返回 [(a_to_u, phone, uid, clue_date_parsed), ...]"""
rows = read_sheet(token, SUMMARY_SHEET_ID, "A3:W2000")
entries = []
for row in rows:
if not row or all(not cell for cell in row):
continue
phone = safe_cell(row, 4) # E 列
uid = safe_cell(row, 7) # H 列
if not phone and not uid:
continue
# AU 镜像段 (21 列, indices 0-20)
a_to_u = [safe_cell(row, i) for i in range(21)]
# C 列进线日期 (index 2)
clue_date = parse_date_str(safe_cell(row, 2))
entries.append((a_to_u, phone, uid, clue_date))
log(f" 订单汇总: {len(entries)}")
return entries
# ═══ Step 2: XXTEA 匹配 + 收集 UID ═══
def resolve_uids(entries):
phone_set = set()
for _, phone, uid, _ in entries:
if re.match(r'^\d{11}$', phone):
phone_set.add(phone)
phone_to_uid = {}
if phone_set:
log(f" XXTEA 加密: {len(phone_set)} 个手机号")
phone_enc_map = {}
for phone in phone_set:
try:
phone_enc_map[encrypt_phone(phone)] = phone
except Exception as ex:
log(f" 加密失败 {phone}: {ex}")
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
dbname="vala_bi", connect_timeout=30
)
cur = conn.cursor()
enc_list = list(phone_enc_map.keys())
for i in range(0, len(enc_list), 500):
chunk = enc_list[i:i + 500]
ph = ",".join(["%s"] * len(chunk))
cur.execute(
f"SELECT id, tel_encrypt FROM bi_vala_app_account "
f"WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
chunk
)
for uid, tel_enc in cur.fetchall():
plain = phone_enc_map.get(tel_enc)
if plain:
phone_to_uid[plain] = str(uid)
cur.close()
conn.close()
log(f" 匹配到 {len(phone_to_uid)} 个 UID")
uid_set = set()
for _, phone, uid, _ in entries:
if re.match(r'^\d{11}$', phone) and phone in phone_to_uid:
uid_set.add(int(phone_to_uid[phone]))
elif uid and uid.isdigit() and int(uid) > 0:
uid_set.add(int(uid))
log(f" 有效 UID: {len(uid_set)}")
return phone_to_uid, uid_set
# ═══ Step 3: 查全量订单 ═══
def query_all_orders(uid_set):
uid_list = list(uid_set)
if not uid_list:
return {}
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
dbname="vala_bi", connect_timeout=30
)
cur = conn.cursor()
log(" 查询全量订单...")
orders = batch_in(cur,
"SELECT account_id, trade_no, out_trade_no, pay_success_date, key_from, goods_id, "
"pay_amount_int, order_status "
"FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL "
"ORDER BY pay_success_date DESC",
uid_list
)
# 收集所有 trade_no 和 out_trade_no 用于退费匹配
trade_nos = [o[1] for o in orders if o[1]]
out_trade_nos = [o[2] for o in orders if o[2]]
all_order_nos = list(set(trade_nos + out_trade_nos))
refund_map = {}
if all_order_nos:
# 同时按 trade_no 和 out_trade_no 匹配退费记录SUM 累加退费金额 [李承龙确认 2026-06-18]
refunds_by_trade = batch_in(cur,
"SELECT trade_no, SUM(refund_amount_int) FROM bi_refund_order "
"WHERE trade_no IN (%s) AND status=3 GROUP BY trade_no",
all_order_nos
)
for tn, amt in refunds_by_trade:
refund_map[tn] = refund_map.get(tn, 0) + (amt or 0)
refunds_by_out = batch_in(cur,
"SELECT out_trade_no, SUM(refund_amount_int) FROM bi_refund_order "
"WHERE out_trade_no IN (%s) AND status=3 GROUP BY out_trade_no",
all_order_nos
)
for otn, amt in refunds_by_out:
refund_map[otn] = refund_map.get(otn, 0) + (amt or 0)
cur.close()
conn.close()
uid_orders = defaultdict(list)
for o in orders:
aid = o[0]
tn = o[1]
otn = o[2]
gmv = o[6] / 100.0
order_status = o[7]
# 退费金额:同时按 trade_no 和 out_trade_no 匹配SUM 累加 [李承龙确认 2026-06-18]
refund = (refund_map.get(tn, 0) + refund_map.get(otn, 0)) / 100.0
# 退费判定order_status=4 直接视为退费order_status=3 需退费表有记录
is_refunded = (order_status == 4) or (order_status == 3 and refund > 0)
gsv = gmv - refund
dt = o[3]
order_date = f"{dt.month}{dt.day}{dt.strftime('%H:%M:%S')}" if dt else ""
order_date_raw = dt.strftime("%Y-%m-%d %H:%M:%S") if dt else ""
uid_orders[aid].append({
"trade_no": tn or "",
"order_date": order_date,
"order_date_raw": order_date_raw,
"key_from": o[4] or "",
"product": GOODS_NAMES.get(o[5], f"商品{o[5]}"),
"gmv": int(gmv),
"refund": int(refund),
"gsv": int(gsv),
"order_status": order_status,
"is_refunded": is_refunded,
})
log(f" 全量订单: {sum(len(v) for v in uid_orders.values())}")
return uid_orders
# ═══ Step 4: 展开写入财务 tab ═══
def write_finance_sheet(token, entries, phone_to_uid, uid_orders):
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
rows = []
for a_to_u, phone, uid_str, clue_date in entries:
# 确定 UID
aid = 0
if re.match(r'^\d{11}$', phone) and phone in phone_to_uid:
aid = int(phone_to_uid[phone])
elif uid_str and uid_str.isdigit() and int(uid_str) > 0:
aid = int(uid_str)
orders = uid_orders.get(aid, [])
if not orders:
# 无订单写一行AU 镜像,其余留空
row = a_to_u[:21] # AU
row += ["", "", "", "", "", ""] # V W X Y Z
rows.append(row)
else:
for o in orders:
# AU 镜像段 (21列)
row = a_to_u[:21]
# 确保有21列
while len(row) < 21:
row.append("")
# K(10): 下单日期
row[10] = o["order_date"]
# L(11): 成交渠道
row[11] = o["key_from"]
# M(12): 产品
row[12] = o["product"]
# N(13): GMV
row[13] = o["gmv"] if o["gmv"] > 0 else ""
# O(14): 退款
row[14] = o["refund"] if o["refund"] > 0 else ""
# P(15): GSV
row[15] = o["gsv"] if o["gsv"] > 0 else ""
# U(20): 更新时间
row[20] = now_str
# V(21): 渠道归属
row.append(classify_channel(o["key_from"]))
# W(22): 留空 (Cursor公式)
row.append("")
# X(23): 订单号
row.append(o["trade_no"])
# Y(24): 该单有效→1 · 全额退款→0 · GSV≤0→0
gmv_val = o["gmv"]
refund_val = o["refund"]
gsv_val = o["gsv"]
is_refunded = o.get("is_refunded", False)
is_full_refund = (gmv_val > 0 and gmv_val == refund_val) or (o["order_status"] == 4 and refund_val == 0)
order_valid = (gsv_val > 0 and not is_full_refund)
# 进线早于下单检查
if order_valid and clue_date and o["order_date_raw"]:
if o["order_date_raw"] < clue_date:
order_valid = False
row.append(1 if order_valid else 0)
# Z(25): 留空 (Cursor补)
row.append("")
rows.append(row)
log(f" 展开后共 {len(rows)}")
# 确保每行 26 列
for row in rows:
while len(row) < 26:
row.append("")
# 先 clear A3:Z5000 全删
log(" 清空 A3:Z5000...")
writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token)
writer.clear(FINANCE_SHEET_ID, start_row=3, end_row=5000, cols=26)
# 写入
writer.write(FINANCE_SHEET_ID, start_row=3, rows=rows, cols=26)
log(f" 财务订单写入完成")
# ═══ Main ═══
def main():
log("=" * 60)
log("财务口径订单刷新 v3 启动")
try:
token = get_fs_token()
log("Step 1: 读订单汇总 2smjwA")
entries = parse_summary(token)
log("Step 2: XXTEA 匹配 UID")
phone_to_uid, uid_set = resolve_uids(entries)
log("Step 3: 查询全量订单")
uid_orders = query_all_orders(uid_set)
log("Step 4: 展开写入财务 tab")
write_finance_sheet(token, entries, phone_to_uid, uid_orders)
log("✅ 财务订单刷新完成")
return 0
except Exception as e:
log(f"❌ ERROR: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())