219 lines
7.4 KiB
Python
219 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
直购表全量刷新(小红书店铺 dianpu-xhs + stream-xhs)
|
||
触发:full_refresh 时先跑此脚本,再跑 refresh_order_summary.py
|
||
归属:小溪 (xiaoxi)
|
||
|
||
直购用户不依赖销售表手机号匹配,直接从 DB 查 key_from 含 dianpu-xhs/stream-xhs 的订单。
|
||
"""
|
||
import json, time, os, requests, psycopg2
|
||
from datetime import datetime
|
||
|
||
# ── 配置 ──
|
||
APP_ID = "cli_a929ae22e0b8dcc8"
|
||
APP_SECRET = "OtFjMy7p3qE3VvLbMdcWidwgHOnGD4FJ"
|
||
SPREADSHEET_TOKEN = "NoZqsFi47hIOHEt9j8WcfRtbnug"
|
||
DIRECT_SHEET = "1sosYE"
|
||
|
||
HEADER = [
|
||
"销售归属", "微信昵称", "进线日期", "体验节数", "手机号", "用户年级",
|
||
"课史/跟进", "用户ID", "注册日期", "下载渠道", "是否下单", "下单日期",
|
||
"成交渠道", "产品", "下单金额", "退款金额", "实际收入", "激活课程",
|
||
"当前行课进度", "最近行课时间", "累计学习时长", "更新时间", "渠道归属", "有效成单"
|
||
]
|
||
|
||
|
||
def _get_pg_password():
|
||
secrets_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "secrets.env")
|
||
with open(secrets_path) as f:
|
||
for line in f:
|
||
if line.startswith("PG_ONLINE_PASSWORD="):
|
||
return line.strip().split("=", 1)[1].strip('"').strip("'")
|
||
raise RuntimeError("PG_ONLINE_PASSWORD not found in secrets.env")
|
||
|
||
|
||
PG_CONFIG = {
|
||
"host": "bj-postgres-16pob4sg.sql.tencentcdb.com", "port": 28591,
|
||
"user": "ai_member", "password": _get_pg_password(), "database": "vala_bi",
|
||
}
|
||
|
||
|
||
def get_token():
|
||
r = requests.post("https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
|
||
json={"app_id": APP_ID, "app_secret": APP_SECRET}, timeout=15)
|
||
return r.json()["tenant_access_token"]
|
||
|
||
|
||
def put_values(token, sheet_id, range_str, values, retries=3):
|
||
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
|
||
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
|
||
for attempt in range(retries):
|
||
r = requests.put(url, headers={
|
||
"Authorization": f"Bearer {token}",
|
||
"Content-Type": "application/json"
|
||
}, json=body, timeout=30)
|
||
result = r.json()
|
||
if result.get("code") == 0:
|
||
return True
|
||
time.sleep(1)
|
||
return False
|
||
|
||
|
||
def main():
|
||
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] 直购表刷新 启动")
|
||
|
||
# ── Step 1: DB 查询 ──
|
||
conn = psycopg2.connect(**PG_CONFIG, connect_timeout=30)
|
||
cur = conn.cursor()
|
||
|
||
cur.execute("""
|
||
SELECT
|
||
o.account_id,
|
||
a.name,
|
||
a.tel,
|
||
a.created_at,
|
||
a.download_channel,
|
||
o.pay_success_date,
|
||
o.key_from,
|
||
o.goods_name,
|
||
o.pay_amount_int,
|
||
o.order_status,
|
||
COALESCE(r.refund_amount_int, 0) as refund_amount
|
||
FROM bi_vala_order o
|
||
JOIN bi_vala_app_account a ON o.account_id = a.id AND a.status = 1
|
||
LEFT JOIN bi_refund_order r ON o.trade_no = r.trade_no AND r.status = 3
|
||
WHERE o.pay_success_date IS NOT NULL
|
||
AND o.order_status IN (3,4)
|
||
AND (o.key_from LIKE '%dianpu-xhs%' OR o.key_from LIKE '%stream-xhs%')
|
||
ORDER BY o.pay_success_date DESC
|
||
""")
|
||
orders = cur.fetchall()
|
||
|
||
# Dedup by account_id
|
||
seen = set()
|
||
unique_orders = []
|
||
for o in orders:
|
||
if o[0] not in seen:
|
||
seen.add(o[0])
|
||
unique_orders.append(o)
|
||
|
||
uids = [o[0] for o in unique_orders]
|
||
|
||
# 体验节数
|
||
cur.execute("""
|
||
SELECT account_id, COUNT(*) FROM bi_user_course_detail
|
||
WHERE account_id = ANY(%s) AND expire_time IS NULL AND deleted_at IS NULL
|
||
GROUP BY account_id
|
||
""", (uids,))
|
||
exp_map = {r[0]: r[1] for r in cur.fetchall()}
|
||
|
||
# 激活课程
|
||
cur.execute("""
|
||
SELECT DISTINCT ON (account_id) account_id, course_level
|
||
FROM bi_user_course_detail
|
||
WHERE account_id = ANY(%s) AND expire_time IS NOT NULL AND deleted_at IS NULL
|
||
ORDER BY account_id, expire_time DESC
|
||
""", (uids,))
|
||
course_map = {r[0]: r[1] for r in cur.fetchall()}
|
||
|
||
# 学习进度
|
||
cur.execute("""
|
||
SELECT DISTINCT ON (c.user_id) a.id, blul.course_level, blul.course_season,
|
||
blul.course_unit, blul.course_lesson, c.created_at
|
||
FROM bi_user_chapter_play_record_0 c
|
||
JOIN bi_vala_app_character ch ON c.user_id = ch.id
|
||
JOIN bi_vala_app_account a ON ch.account_id = a.id
|
||
JOIN bi_level_unit_lesson blul ON c.chapter_id = blul.id
|
||
WHERE a.id = ANY(%s) AND c.play_status = 1
|
||
ORDER BY c.user_id, c.created_at DESC
|
||
""", (uids,))
|
||
study_map = {}
|
||
study_time_map = {}
|
||
for r in cur.fetchall():
|
||
aid = r[0]
|
||
if aid not in study_map:
|
||
study_map[aid] = f"{r[1]}-{r[2]}-{r[3]}-{r[4]}"
|
||
study_time_map[aid] = str(r[5])[:19]
|
||
|
||
# 学习时长
|
||
cur.execute("""
|
||
SELECT a.id, SUM(comp.interval_time)
|
||
FROM bi_user_chapter_play_record_0 c
|
||
JOIN bi_vala_app_character ch ON c.user_id = ch.id
|
||
JOIN bi_vala_app_account a ON ch.account_id = a.id
|
||
JOIN bi_user_component_play_record_0 comp ON c.chapter_unique_id = comp.chapter_unique_id
|
||
WHERE a.id = ANY(%s) AND c.play_status = 1
|
||
GROUP BY a.id
|
||
""", (uids,))
|
||
time_map = {r[0]: r[1] for r in cur.fetchall()}
|
||
|
||
cur.close()
|
||
conn.close()
|
||
|
||
# ── Step 2: 构建行 ──
|
||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
new_rows = []
|
||
|
||
for o in unique_orders:
|
||
uid = o[0]
|
||
name = o[1] or ""
|
||
tel = o[2] or ""
|
||
reg = str(o[3])[:10] if o[3] else ""
|
||
dl = o[4] or ""
|
||
l_date_raw = o[5]
|
||
l_date = str(l_date_raw)[:10] if l_date_raw else ""
|
||
key_from = o[6]
|
||
goods = o[7].strip() if o[7] else ""
|
||
amount = o[8] / 100
|
||
refund = o[10] / 100 if o[10] else 0
|
||
|
||
# 全额退跳过
|
||
if refund > 0 and refund >= amount:
|
||
continue
|
||
|
||
exp = exp_map.get(uid, 0)
|
||
course = course_map.get(uid, "")
|
||
study = study_map.get(uid, "")
|
||
last_study = study_time_map.get(uid, "")
|
||
total_min = round(time_map.get(uid, 0) / 60000, 1) if time_map.get(uid, 0) else ""
|
||
gsv = amount - refund
|
||
|
||
row = [
|
||
"直购", name, "", exp, tel, "", "", uid, reg, dl,
|
||
"是", l_date, key_from, goods, amount,
|
||
refund if refund > 0 else "", gsv, course, study,
|
||
last_study, total_min, now_str, "直购", 1,
|
||
]
|
||
new_rows.append(row)
|
||
|
||
# 按下单日期降序
|
||
new_rows.sort(key=lambda r: r[11], reverse=True)
|
||
|
||
print(f"有效直购订单: {len(new_rows)}")
|
||
|
||
# ── Step 3: 写入飞书 ──
|
||
token = get_token()
|
||
|
||
# 写表头
|
||
put_values(token, DIRECT_SHEET, "A1:X1", [HEADER])
|
||
|
||
# 写数据
|
||
for batch_start in range(0, len(new_rows), 20):
|
||
batch = new_rows[batch_start:batch_start + 20]
|
||
sr = 2 + batch_start
|
||
er = sr + len(batch) - 1
|
||
put_values(token, DIRECT_SHEET, f"A{sr}:X{er}", batch)
|
||
time.sleep(0.3)
|
||
|
||
# 清除多余旧行
|
||
total = len(new_rows)
|
||
clear_start = 2 + total
|
||
empty = [[""] * 24 for _ in range(50)]
|
||
put_values(token, DIRECT_SHEET, f"A{clear_start}:X{clear_start + 49}", empty)
|
||
|
||
print(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ✅ 直购表刷新完成 ({total} 行)")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|