464 lines
15 KiB
Python
464 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
大麦 v2_fill — 细水新架构版 线索UID + 订单全量回填
|
||
|
||
表: CP7BsOjYdhtcmft5iz2csIaHnKe
|
||
① 线索明细 (7fdb4b): F列手机号 → XXTEA加密 → PG tel_encrypt精确匹配 → K列UID
|
||
② 订单明细 (vrYbiX): clear后全量写 A-P (勿填 Q/R)
|
||
· G 挂主进线 · B=线索C主归属 · N≠重复*
|
||
|
||
用法:
|
||
python3 scripts/damai_v2_fill.py
|
||
|
||
契约: docs/damai-v2-fill-skill.md (与 xiaoxi 版同契约)
|
||
"""
|
||
|
||
import json, re, time, sys, os, requests, psycopg2
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
|
||
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
WORKSPACE = os.path.dirname(SCRIPTS_DIR)
|
||
|
||
sys.path.insert(0, SCRIPTS_DIR)
|
||
from phone_encrypt import encrypt_phone
|
||
|
||
# ── 配置 ──
|
||
SPREADSHEET_TOKEN = "CP7BsOjYdhtcmft5iz2csIaHnKe"
|
||
LEADS_SHEET_ID = "7fdb4b" # 线索明细
|
||
ORDERS_SHEET_ID = "vrYbiX" # 订单明细
|
||
|
||
# 实习虾 app 凭证 (有 sheets 读写权限)
|
||
FS_APP_ID = "cli_aa898f32d4799bea"
|
||
FS_APP_SECRET = "wGBjexDxHsHBsx9lk8O2gdbkMFzaJ3kd"
|
||
|
||
# 渠道映射
|
||
CHANNEL_MAP = {
|
||
"Apple App Store": "苹果", "科大讯飞学习机": "讯飞", "学而思学习机": "学而思",
|
||
"华为应用市场": "华为", "小米应用市场": "小米", "应用宝应用市场": "应用宝",
|
||
"希沃学习机": "希沃", "荣耀应用市场": "荣耀", "小度学习机": "小度",
|
||
"oppo应用市场": "OPPO", "vivo应用市场": "VIVO", "京东方学习机": "京东方",
|
||
"步步高学习机": "步步高", "作业帮学习机": "作业帮", "魅族应用市场": "魅族",
|
||
"官网": "官网",
|
||
}
|
||
|
||
LOG_FILE = "/var/log/damai_v2_fill.log"
|
||
|
||
|
||
def log(msg):
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
line = f"[{ts}] {msg}"
|
||
print(line)
|
||
with open(LOG_FILE, "a") as f:
|
||
f.write(line + "\n")
|
||
|
||
|
||
def get_secret(key):
|
||
with open(os.path.join(WORKSPACE, "secrets.env")) as f:
|
||
for line in f:
|
||
if line.startswith(f"{key}="):
|
||
return line.strip().split("=", 1)[1].strip("'\"")
|
||
|
||
|
||
def get_fs_token():
|
||
resp = requests.post(
|
||
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": FS_APP_ID, "app_secret": FS_APP_SECRET},
|
||
timeout=15
|
||
)
|
||
r = resp.json()
|
||
if r.get("code") != 0:
|
||
raise RuntimeError(f"获取飞书token失败: {r}")
|
||
return r["tenant_access_token"]
|
||
|
||
|
||
def read_sheet(token, sheet_id, range_str=None):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}"
|
||
if range_str:
|
||
url += f"!{range_str}"
|
||
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
|
||
data = resp.json()
|
||
if data.get("code") != 0:
|
||
raise RuntimeError(f"读取失败 {sheet_id}: {data}")
|
||
return data["data"]["valueRange"]["values"]
|
||
|
||
|
||
def put_values(token, sheet_id, range_str, values):
|
||
"""单次写入,自动分批"""
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
|
||
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
|
||
resp = requests.put(url, headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json"
|
||
}, json=body, timeout=30)
|
||
r = resp.json()
|
||
if r.get("code") != 0:
|
||
log(f" ❌ {range_str}: {r.get('code')} {r.get('msg')}")
|
||
return False
|
||
return True
|
||
|
||
|
||
def clear_range(token, sheet_id, range_str):
|
||
"""清空指定范围 — 写入空字符串覆盖"""
|
||
m = re.match(r'([A-Z]+)(\d+):([A-Z]+)(\d+)', range_str)
|
||
if not m:
|
||
log(f" ❌ 无法解析范围: {range_str}")
|
||
return False
|
||
sc, sr, ec, er = m.group(1), int(m.group(2)), m.group(3), int(m.group(4))
|
||
ncols = ord(ec) - ord(sc) + 1
|
||
BATCH = 500
|
||
for start in range(sr, er + 1, BATCH):
|
||
end = min(start + BATCH - 1, er)
|
||
batch_rows = end - start + 1
|
||
empty_row = [""] * ncols
|
||
values = [empty_row[:] for _ in range(batch_rows)]
|
||
rng = f"{sc}{start}:{ec}{end}"
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
|
||
body = {"valueRange": {"range": f"{sheet_id}!{rng}", "values": values}}
|
||
resp = requests.put(url, headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json"
|
||
}, json=body, timeout=60)
|
||
r = resp.json()
|
||
if r.get("code") != 0:
|
||
log(f" ❌ clear {rng}: {r.get('code')} {r.get('msg')}")
|
||
return False
|
||
time.sleep(0.2)
|
||
return True
|
||
|
||
|
||
def batch_in(cur, sql_tpl, params, chunk=500):
|
||
results = []
|
||
for i in range(0, len(params), chunk):
|
||
batch = params[i:i+chunk]
|
||
ph = ",".join(["%s"] * len(batch))
|
||
cur.execute(sql_tpl % ph, batch)
|
||
results.extend(cur.fetchall())
|
||
return results
|
||
|
||
|
||
def classify_channel(key_from):
|
||
"""渠道归属分类: 端内/销转/达人/直购"""
|
||
if not key_from:
|
||
return "直购"
|
||
kf = key_from.strip()
|
||
if kf in ("app-active-h5-0-0", "app-sales-bj-qhm-0", "app-sales-bj-wd-0"):
|
||
return "端内"
|
||
if kf.startswith("sales-adp-"):
|
||
return "销转"
|
||
if kf.startswith("newmedia-daren-") or kf == "newmedia-dianpu-wwxx-0-0":
|
||
return "达人"
|
||
return "直购"
|
||
|
||
|
||
def parse_date(date_str):
|
||
"""解析 '6月14日 19:09:12' 或 '6月14日' → '2026-06-14'"""
|
||
if not date_str:
|
||
return ""
|
||
date_str = str(date_str).strip()
|
||
m = re.match(r'(\d+)月(\d+)日', date_str)
|
||
if not m:
|
||
return ""
|
||
month, day = int(m.group(1)), int(m.group(2))
|
||
return f"2026-{month:02d}-{day:02d}"
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# ① 线索明细: F→K UID
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def fill_lead_uids(token):
|
||
"""读取线索明细F列手机号 → XXTEA加密 → PG匹配 → 写K列UID"""
|
||
log("=" * 60)
|
||
log("① 线索明细 F→K UID")
|
||
log("=" * 60)
|
||
|
||
# 读取线索明细 (A3:K200, 跳过表头2行)
|
||
rows = read_sheet(token, LEADS_SHEET_ID, "A3:K200")
|
||
log(f" 读取 {len(rows)} 行线索数据")
|
||
|
||
# 解析: (row_idx, phone, existing_uid, 主归属, 昵称, 进线时间)
|
||
leads = []
|
||
for i, row in enumerate(rows):
|
||
row_idx = i + 3 # 飞书行号
|
||
phone = ""
|
||
if len(row) > 5 and row[5]:
|
||
try:
|
||
phone = str(int(float(row[5])))
|
||
except:
|
||
phone = str(row[5]).strip()
|
||
existing_uid = ""
|
||
if len(row) > 10 and row[10]:
|
||
try:
|
||
existing_uid = str(int(float(row[10])))
|
||
except:
|
||
existing_uid = str(row[10]).strip()
|
||
master_cs = str(row[2]).strip() if len(row) > 2 and row[2] else "" # C=主归属
|
||
nickname = str(row[4]).strip() if len(row) > 4 and row[4] else "" # E=昵称
|
||
entry_time = str(row[1]).strip() if len(row) > 1 and row[1] else "" # B=进线时间
|
||
leads.append((row_idx, phone, existing_uid, master_cs, nickname, entry_time))
|
||
|
||
# 找出需要查UID的行 (有手机号但无UID)
|
||
need_uid = [(idx, phone) for idx, phone, uid, *_ in leads
|
||
if re.match(r'^\d{11}$', phone) and (not uid or not uid.isdigit() or int(uid) <= 0)]
|
||
log(f" 需要查UID: {len(need_uid)} 个手机号")
|
||
|
||
if not need_uid:
|
||
log(" 无需查询,跳过")
|
||
return leads
|
||
|
||
# XXTEA 加密
|
||
phone_enc_map = {}
|
||
for _, phone in need_uid:
|
||
try:
|
||
enc = encrypt_phone(phone)
|
||
phone_enc_map[enc] = phone
|
||
except Exception as e:
|
||
log(f" 加密失败 {phone}: {e}")
|
||
|
||
log(f" 加密完成, 唯一密文: {len(phone_enc_map)}")
|
||
|
||
# PG 精确查询
|
||
conn = psycopg2.connect(
|
||
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
|
||
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
|
||
dbname="vala_bi", connect_timeout=30
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
enc_list = list(phone_enc_map.keys())
|
||
phone_to_uid = {}
|
||
for i in range(0, len(enc_list), 500):
|
||
chunk = enc_list[i:i+500]
|
||
ph = ",".join(["%s"] * len(chunk))
|
||
cur.execute(
|
||
f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
|
||
chunk
|
||
)
|
||
for uid, tel_enc in cur.fetchall():
|
||
plain = phone_enc_map.get(tel_enc)
|
||
if plain:
|
||
phone_to_uid[plain] = str(uid)
|
||
time.sleep(0.05)
|
||
|
||
cur.close()
|
||
conn.close()
|
||
log(f" 精确匹配到 {len(phone_to_uid)} 个 UID")
|
||
|
||
# 构建 K 列写入数据 (只写需要更新的行)
|
||
updates = []
|
||
for idx, phone in need_uid:
|
||
uid = phone_to_uid.get(phone, "")
|
||
updates.append((idx, uid))
|
||
|
||
# 写入 (每行单独写K列)
|
||
write_count = 0
|
||
for idx, uid in updates:
|
||
if uid:
|
||
cell_range = f"K{idx}:K{idx}"
|
||
if put_values(token, LEADS_SHEET_ID, cell_range, [[uid]]):
|
||
write_count += 1
|
||
time.sleep(0.1)
|
||
|
||
log(f" 写入 {write_count} 个 UID 到 K 列")
|
||
|
||
# 更新 leads 列表中的 UID
|
||
uid_map = {idx: uid for idx, uid in updates}
|
||
updated_leads = []
|
||
for idx, phone, uid, cs, nick, et in leads:
|
||
if idx in uid_map and uid_map[idx]:
|
||
uid = uid_map[idx]
|
||
updated_leads.append((idx, phone, uid, cs, nick, et))
|
||
|
||
return updated_leads
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# ② 订单明细: clear → 全量回填 A-P
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def fill_order_details(token, leads):
|
||
"""查询所有线索UID的订单 → clear 订单明细 → 全量写入 A-P"""
|
||
log("=" * 60)
|
||
log("② 订单明细 DB 全量回填")
|
||
log("=" * 60)
|
||
|
||
# 收集所有 UID
|
||
uid_set = set()
|
||
lead_by_uid = {} # uid → lead info
|
||
for idx, phone, uid, cs, nick, et in leads:
|
||
if uid and uid.isdigit() and int(uid) > 0:
|
||
uid_int = int(uid)
|
||
uid_set.add(uid_int)
|
||
if uid_int not in lead_by_uid:
|
||
lead_by_uid[uid_int] = (idx, phone, cs, nick, et)
|
||
|
||
uid_list = list(uid_set)
|
||
log(f" 有效 UID: {len(uid_list)}")
|
||
|
||
if not uid_list:
|
||
log(" 无 UID,跳过订单查询")
|
||
return
|
||
|
||
# PG 查询订单
|
||
conn = psycopg2.connect(
|
||
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
|
||
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
|
||
dbname="vala_bi", connect_timeout=30
|
||
)
|
||
cur = conn.cursor()
|
||
|
||
# 查询订单
|
||
log(" 查询订单...")
|
||
orders = batch_in(cur,
|
||
"SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status "
|
||
"FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) "
|
||
"ORDER BY account_id, pay_success_date",
|
||
uid_list
|
||
)
|
||
|
||
log(f" 订单数: {len(orders)}")
|
||
|
||
# 查询退款
|
||
trade_nos = [o[1] for o in orders if o[1]]
|
||
refund_map = defaultdict(int)
|
||
if trade_nos:
|
||
refunds = batch_in(cur,
|
||
"SELECT trade_no, refund_amount_int FROM bi_refund_order WHERE trade_no IN (%s) AND status=3",
|
||
trade_nos
|
||
)
|
||
for tno, amt in refunds:
|
||
refund_map[tno] += amt
|
||
|
||
cur.close()
|
||
conn.close()
|
||
|
||
# 构建订单明细行 (A-P, 共16列)
|
||
order_rows = []
|
||
for o in orders:
|
||
account_id, trade_no, pay_date, key_from, goods_id, pay_amount, order_status = o
|
||
|
||
lead = lead_by_uid.get(account_id)
|
||
if not lead:
|
||
continue
|
||
|
||
lead_idx, phone, lead_cs, nickname, entry_time = lead
|
||
|
||
# A=下单月
|
||
order_month = ""
|
||
if pay_date:
|
||
order_month = f"{pay_date.month}月"
|
||
|
||
# B=主归属 (来自线索C列)
|
||
master_cs = lead_cs
|
||
|
||
# C=订单号
|
||
order_no = trade_no or ""
|
||
|
||
# D=用户ID
|
||
user_id = str(account_id)
|
||
|
||
# E=手机号
|
||
phone_str = phone
|
||
|
||
# F=昵称
|
||
nick = nickname
|
||
|
||
# G=线索ID (挂主进线, 用线索行号)
|
||
lead_ref = str(lead_idx)
|
||
|
||
# H=进线时间
|
||
entry = entry_time
|
||
|
||
# I=下单时间
|
||
order_time = pay_date.strftime("%Y-%m-%d") if pay_date else ""
|
||
|
||
# J=GMV (分→元)
|
||
gmv = round(pay_amount / 100, 2) if pay_amount else 0
|
||
|
||
# K=退款
|
||
refund = round(refund_map.get(trade_no, 0) / 100, 2)
|
||
|
||
# L=GSV
|
||
gsv = round(gmv - refund, 2)
|
||
|
||
# M=keyfrom
|
||
kf = key_from or ""
|
||
|
||
# N=渠道归属
|
||
channel = classify_channel(key_from)
|
||
|
||
# O=时序有效 (先留空,由自动规则填充)
|
||
timing_valid = ""
|
||
|
||
# P=备注
|
||
remark = ""
|
||
|
||
order_rows.append([
|
||
order_month, master_cs, order_no, user_id, phone_str,
|
||
nick, lead_ref, entry, order_time, gmv, refund, gsv,
|
||
kf, channel, timing_valid, remark
|
||
])
|
||
|
||
log(f" 生成 {len(order_rows)} 行订单数据")
|
||
|
||
# Clear 订单明细 (A3:P5000)
|
||
log(" 清空订单明细...")
|
||
# 先清空大范围
|
||
clear_range(token, ORDERS_SHEET_ID, "A3:P5000")
|
||
time.sleep(0.5)
|
||
|
||
# 写入订单数据 (分批, 每批最多 400 行 × 16 列 = 6400 格, 留余量用 250 行/批)
|
||
BATCH_ROWS = 250
|
||
total_written = 0
|
||
for i in range(0, len(order_rows), BATCH_ROWS):
|
||
batch = order_rows[i:i+BATCH_ROWS]
|
||
start_row = i + 3 # 从第3行开始
|
||
end_row = start_row + len(batch) - 1
|
||
range_str = f"A{start_row}:P{end_row}"
|
||
if put_values(token, ORDERS_SHEET_ID, range_str, batch):
|
||
total_written += len(batch)
|
||
time.sleep(0.3)
|
||
|
||
log(f" 写入 {total_written} 行订单数据")
|
||
|
||
# db_info 时间戳
|
||
db_info = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
log(f" db_info: {db_info}")
|
||
|
||
return db_info
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# Main
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
def main():
|
||
log("大麦 v2_fill 开始")
|
||
t0 = time.time()
|
||
|
||
token = get_fs_token()
|
||
log(f"飞书 token 获取成功")
|
||
|
||
# ① 线索明细 F→K UID
|
||
leads = fill_lead_uids(token)
|
||
|
||
# ② 订单明细 全量回填
|
||
db_info = fill_order_details(token, leads)
|
||
|
||
elapsed = time.time() - t0
|
||
log(f"v2_fill 完成 · db_info: {db_info} · 耗时 {elapsed:.1f}s")
|
||
|
||
# 输出摘要
|
||
uid_count = sum(1 for _, _, uid, *_ in leads if uid and uid.isdigit() and int(uid) > 0)
|
||
phone_count = sum(1 for _, phone, *_ in leads if phone and re.match(r'^\d{11}$', phone))
|
||
print(f"\n{'='*60}")
|
||
print(f"v2_fill 完成")
|
||
print(f" 线索明细: {len(leads)} 行, {phone_count} 手机号, {uid_count} UID")
|
||
print(f" db_info: {db_info}")
|
||
print(f"{'='*60}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|