ai_member_xiaoxi/scripts/wechat_leads_analysis.py
2026-06-17 08:00:01 +08:00

668 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
企微线索分析·补数脚本
从「线索与企微用户对应」表读取数据通过用户ID/手机号匹配DB
补充学情深度字段,写入同一表格的新 sheet。
需要字段:
1. 付费日期
2. U0完成日期、U1完成日期、付费后N天完成
3. 退费日期、退费金额
4. 退费前体验节数、退费前最高单元/课程进度
5. 注册日期、首课/首单元激活日期
"""
import json, re, sys, os, time, requests, psycopg2
from datetime import datetime, date
from collections import defaultdict
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
WORKSPACE = os.path.dirname(SCRIPTS_DIR)
CRED_DIR = "/root/.openclaw/credentials/xiaoxi"
sys.path.insert(0, SCRIPTS_DIR)
from phone_encrypt import encrypt_phone
from feishu_sheet_utils import FeishuSheetWriter
SPREADSHEET_TOKEN = "RSlMsdRWqhaRrftlnhUcpVTjnOd"
SOURCE_SHEET = "9c7ffe"
OUTPUT_SHEET = "学情补数"
LOG_FILE = "/var/log/xiaoxi_wechat_leads.log"
def log(msg):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def get_secret(key):
with open(os.path.join(WORKSPACE, "secrets.env")) as f:
for line in f:
if line.startswith(f"{key}="):
return line.strip().split("=", 1)[1].strip("'\"")
def get_fs_token():
with open(os.path.join(CRED_DIR, "config.json")) as f:
cfg = json.load(f)
resp = requests.post(
"https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]},
timeout=15
)
return resp.json()["tenant_access_token"]
def read_col(token, sheet_id, col_letter, max_rows=5000):
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}!{col_letter}2:{col_letter}{max_rows}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30)
data = resp.json()
if data.get("code") != 0:
raise RuntimeError(f"读取失败 {col_letter}: {data}")
vals = data["data"]["valueRange"]["values"]
return [v[0] if v else "" for v in vals]
def safe_str(v):
if v is None:
return ""
if isinstance(v, (int, float)):
if v == int(v):
return str(int(v))
return str(v)
return str(v).strip()
def batch_in(cur, sql_tpl, params, chunk=500):
results = []
for i in range(0, len(params), chunk):
batch = params[i:i + chunk]
ph = ",".join(["%s"] * len(batch))
cur.execute(sql_tpl % ph, batch)
results.extend(cur.fetchall())
return results
def parse_sheet_date(v):
"""解析飞书表格中的日期: 数字(Excel序列号) 或 '6月12日' 格式"""
if not v:
return None
s = safe_str(v)
# Excel serial number
try:
n = float(s)
if 45000 < n < 50000:
from datetime import timedelta
return datetime(1899, 12, 30) + timedelta(days=int(n))
except:
pass
# "6月12日" format
m = re.match(r'^(\d{1,2})月(\d{1,2})日', s)
if m:
return datetime(2026, int(m.group(1)), int(m.group(2)))
# "2026-01-07" format
m2 = re.match(r'^(\d{4})-(\d{2})-(\d{2})', s)
if m2:
return datetime(int(m2.group(1)), int(m2.group(2)), int(m2.group(3)))
return None
# ═══ Main ═══
def main():
log("=" * 60)
log("企微线索分析·补数 启动")
token = get_fs_token()
# Step 1: Read source sheet
log("Step 1: 读取源表数据")
cols = {
"A": "进线日期", "C": "微伴客户名", "D": "小红书昵称",
"F": "所属客服", "M": "笔记标题", "N": "笔记作者",
"O": "笔记类型", "Q": "笔记ID", "R": "匹配状态",
"S": "流量类型", "T": "归属账号",
"AC": "手机号", "AD": "用户年级", "AF": "用户ID",
"AG": "注册日期", "AH": "下载渠道",
"AI": "下单日期", "AJ": "成交渠道", "AK": "产品",
"AL": "下单金额(GMV)", "AM": "退款金额", "AN": "实际收入(GSV)",
"AO": "激活课程", "AP": "当前行课进度", "AQ": "最近行课时间",
"AR": "累计学习时长(min)", "AV": "订单号",
"AW": "有效订单", "AX": "渠道归属", "AY": "匹配方式",
}
col_data = {}
for col_letter, col_name in cols.items():
col_data[col_letter] = read_col(token, SOURCE_SHEET, col_letter, 4700)
n_rows = len(col_data["A"])
log(f" 读取 {n_rows}")
# Build row list
rows = []
for i in range(n_rows):
uid_raw = safe_str(col_data["AF"][i]) if i < len(col_data["AF"]) else ""
phone_raw = safe_str(col_data["AC"][i]) if i < len(col_data["AC"]) else ""
order_raw = safe_str(col_data["AV"][i]) if i < len(col_data["AV"]) else ""
uid = uid_raw if uid_raw.isdigit() else ""
phone = ""
if re.match(r'^\d{11}$', phone_raw):
phone = phone_raw
elif phone_raw:
try:
p = str(int(float(phone_raw)))
if re.match(r'^\d{11}$', p):
phone = p
except:
pass
rows.append({
"row": i + 2,
"uid": uid,
"phone": phone,
"order_no": order_raw,
"note_id": safe_str(col_data["Q"][i]) if i < len(col_data["Q"]) else "",
"note_title": safe_str(col_data["M"][i]) if i < len(col_data["M"]) else "",
"note_author": safe_str(col_data["N"][i]) if i < len(col_data["N"]) else "",
"note_type": safe_str(col_data["O"][i]) if i < len(col_data["O"]) else "",
"match_status": safe_str(col_data["R"][i]) if i < len(col_data["R"]) else "",
"traffic_type": safe_str(col_data["S"][i]) if i < len(col_data["S"]) else "",
"account": safe_str(col_data["T"][i]) if i < len(col_data["T"]) else "",
"customer_name": safe_str(col_data["C"][i]) if i < len(col_data["C"]) else "",
"xhs_nickname": safe_str(col_data["D"][i]) if i < len(col_data["D"]) else "",
"cs": safe_str(col_data["F"][i]) if i < len(col_data["F"]) else "",
"grade": safe_str(col_data["AD"][i]) if i < len(col_data["AD"]) else "",
"reg_date": safe_str(col_data["AG"][i]) if i < len(col_data["AG"]) else "",
"download_channel": safe_str(col_data["AH"][i]) if i < len(col_data["AH"]) else "",
"order_date": safe_str(col_data["AI"][i]) if i < len(col_data["AI"]) else "",
"order_channel": safe_str(col_data["AJ"][i]) if i < len(col_data["AJ"]) else "",
"product": safe_str(col_data["AK"][i]) if i < len(col_data["AK"]) else "",
"gmv": safe_str(col_data["AL"][i]) if i < len(col_data["AL"]) else "",
"refund": safe_str(col_data["AM"][i]) if i < len(col_data["AM"]) else "",
"gsv": safe_str(col_data["AN"][i]) if i < len(col_data["AN"]) else "",
"activation": safe_str(col_data["AO"][i]) if i < len(col_data["AO"]) else "",
"lesson_progress": safe_str(col_data["AP"][i]) if i < len(col_data["AP"]) else "",
"lesson_time": safe_str(col_data["AQ"][i]) if i < len(col_data["AQ"]) else "",
"lesson_minutes": safe_str(col_data["AR"][i]) if i < len(col_data["AR"]) else "",
"valid_order": safe_str(col_data["AW"][i]) if i < len(col_data["AW"]) else "",
"channel_attr": safe_str(col_data["AX"][i]) if i < len(col_data["AX"]) else "",
"match_method": safe_str(col_data["AY"][i]) if i < len(col_data["AY"]) else "",
})
# Step 2: Phone → UID matching for rows without UID
log("Step 2: 手机号 XXTEA 匹配")
phone_set = set(r["phone"] for r in rows if r["phone"] and not r["uid"])
log(f" 待匹配手机号: {len(phone_set)}")
phone_enc_map = {}
for p in phone_set:
try:
phone_enc_map[encrypt_phone(p)] = p
except Exception as ex:
log(f" 加密失败 {p}: {ex}")
conn = psycopg2.connect(
host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591,
user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"),
dbname="vala_bi", connect_timeout=30
)
cur = conn.cursor()
phone_to_uid = {}
if phone_enc_map:
enc_list = list(phone_enc_map.keys())
for i in range(0, len(enc_list), 500):
chunk = enc_list[i:i + 500]
ph = ",".join(["%s"] * len(chunk))
cur.execute(
f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL",
chunk
)
for uid, tel_enc in cur.fetchall():
plain = phone_enc_map.get(tel_enc)
if plain:
phone_to_uid[plain] = str(uid)
# Fill missing UIDs
filled = 0
for r in rows:
if not r["uid"] and r["phone"] in phone_to_uid:
r["uid"] = phone_to_uid[r["phone"]]
filled += 1
log(f" 手机号匹配补充 UID: {filled}")
# Collect all UIDs
uid_set = set()
for r in rows:
if r["uid"]:
uid_set.add(int(r["uid"]))
uid_list = list(uid_set)
log(f" 唯一用户ID: {len(uid_list)}")
# Step 3: Query all DB data
log("Step 3: 数据库批量查询")
db = {uid: {} for uid in uid_set}
# 3a. Registration info
log(" 查询注册信息...")
reg_info = batch_in(cur,
"SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN (%s) AND status=1 AND deleted_at IS NULL",
uid_list
)
for aid, created_at, dc in reg_info:
db[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else ""
db[aid]["download_channel"] = dc or ""
# 3b. Orders
log(" 查询订单信息...")
orders = batch_in(cur,
"SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status "
"FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) "
"ORDER BY pay_success_date",
uid_list
)
user_orders = defaultdict(list)
for o in orders:
user_orders[o[0]].append(o)
trade_nos = [o[1] for o in orders if o[1]]
refund_map = {}
if trade_nos:
refunds = batch_in(cur,
"SELECT trade_no, refund_amount_int, created_at FROM bi_refund_order WHERE trade_no IN (%s) AND status=3",
trade_nos
)
for tn, amt, created_at in refunds:
refund_map[tn] = (amt, created_at)
for aid, olist in user_orders.items():
# First order date
first_order = min(olist, key=lambda x: x[2])
db[aid]["first_pay_date"] = first_order[2].strftime("%Y-%m-%d") if first_order[2] else ""
db[aid]["first_pay_dt"] = first_order[2]
# All pay dates
db[aid]["pay_dates"] = sorted(set(o[2].strftime("%Y-%m-%d") for o in olist if o[2]))
# Refund info
total_refund = 0
refund_dates = []
for o in olist:
if o[1] in refund_map:
amt, rdate = refund_map[o[1]]
total_refund += amt
if rdate:
refund_dates.append(rdate.strftime("%Y-%m-%d"))
db[aid]["total_refund"] = total_refund / 100.0
db[aid]["refund_dates"] = sorted(set(refund_dates))
db[aid]["first_refund_date"] = refund_dates[0] if refund_dates else ""
# Total GMV
db[aid]["total_gmv"] = sum(o[5] for o in olist) / 100.0
# 3c. Trial courses (体验课)
log(" 查询体验课...")
trial_info = batch_in(cur,
"SELECT account_id, COUNT(*) FROM bi_user_course_detail WHERE account_id IN (%s) AND expire_time IS NULL AND deleted_at IS NULL GROUP BY account_id",
uid_list
)
for aid, cnt in trial_info:
db[aid]["trial_count"] = cnt
# 3d. Activation (seasonal tickets)
log(" 查询激活课程...")
tickets = batch_in(cur,
"SELECT account_id, season_package_level, created_at FROM bi_vala_seasonal_ticket "
"WHERE account_id IN (%s) AND status=1 AND deleted_at IS NULL ORDER BY created_at",
uid_list
)
for aid, level, created_at in tickets:
if "activation" not in db[aid]:
db[aid]["activation"] = level
db[aid]["activation_date"] = created_at.strftime("%Y-%m-%d") if created_at else ""
elif "activation2" not in db[aid]:
db[aid]["activation2"] = level
db[aid]["activation_date2"] = created_at.strftime("%Y-%m-%d") if created_at else ""
# 3e. Chapter structure for U0/U1
log(" 查询课程结构...")
cur.execute(
"SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson"
)
chapter_map = {} # (level, season, unit, lesson) -> chapter_id
unit_chapters = defaultdict(list) # (level, season, unit) -> [chapter_ids]
for cid, cl, cs, cu, clesson in cur.fetchall():
chapter_map[(cl, cs, cu, clesson)] = cid
unit_chapters[(cl, cs, cu)].append(cid)
# U0 = L1 S0 U00, U1 = L1 S0 U01
u0_chapters = unit_chapters.get(("L1", "S0", "U00"), [])
u1_chapters = unit_chapters.get(("L1", "S0", "U01"), [])
# Also L2 versions
u0_chapters_l2 = unit_chapters.get(("L2", "S0", "U00"), [])
u1_chapters_l2 = unit_chapters.get(("L2", "S0", "U01"), [])
all_u0 = set(u0_chapters + u0_chapters_l2)
all_u1 = set(u1_chapters + u1_chapters_l2)
# 3f. Character IDs
log(" 查询角色信息...")
chars = batch_in(cur,
"SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN (%s) AND deleted_at IS NULL",
uid_list
)
uid_to_chars = defaultdict(list)
for aid, cid in chars:
uid_to_chars[aid].append(cid)
all_char_ids = [c for chars_list in uid_to_chars.values() for c in chars_list]
log(f" 角色数: {len(all_char_ids)}")
# 3g. Chapter play records for U0/U1 completion
log(" 查询U0/U1完成记录...")
all_chapters = list(all_u0 | all_u1)
u0u1_first = defaultdict(dict) # user_id -> {chapter_id: first_completion_date}
tables = [f"bi_user_chapter_play_record_{i}" for i in range(8)]
for table in tables:
try:
for i in range(0, len(all_char_ids), 200):
chunk_users = all_char_ids[i:i+200]
cur.execute(
f"SELECT user_id, chapter_id, MIN(created_at) FROM {table} "
f"WHERE user_id = ANY(%s) AND play_status=1 AND deleted_at IS NULL AND chapter_id = ANY(%s) "
f"GROUP BY user_id, chapter_id",
(chunk_users, all_chapters)
)
for user_id, chapter_id, first_dt in cur.fetchall():
u0u1_first[user_id][chapter_id] = first_dt
except Exception as e:
log(f" {table}: {e}")
conn.rollback()
# Compute U0/U1 completion per account
for aid in uid_set:
char_ids = uid_to_chars.get(aid, [])
u0_dates = []
u1_dates = []
for cid in char_ids:
for ch_id, dt in u0u1_first.get(cid, {}).items():
if ch_id in all_u0:
u0_dates.append(dt)
if ch_id in all_u1:
u1_dates.append(dt)
# U0 completion = all U0 chapters completed (5 lessons)
# Use the max date among U0 chapters as completion date
if u0_dates:
db[aid]["u0_complete_date"] = max(u0_dates).strftime("%Y-%m-%d")
if u1_dates:
db[aid]["u1_complete_date"] = max(u1_dates).strftime("%Y-%m-%d")
# 3h. All chapter completions for max progress
log(" 查询全部课时完成记录...")
all_chapter_first = defaultdict(dict) # user_id -> {chapter_id: first_date}
for table in tables:
try:
for i in range(0, len(all_char_ids), 200):
chunk_users = all_char_ids[i:i+200]
cur.execute(
f"SELECT user_id, chapter_id, MIN(created_at) FROM {table} "
f"WHERE user_id = ANY(%s) AND play_status=1 AND deleted_at IS NULL "
f"GROUP BY user_id, chapter_id",
(chunk_users,)
)
for user_id, chapter_id, first_dt in cur.fetchall():
all_chapter_first[user_id][chapter_id] = first_dt
except Exception as e:
log(f" {table}: {e}")
conn.rollback()
# Build chapter_id -> (level, season, unit, lesson) reverse map
chapter_reverse = {}
for (cl, cs, cu, clesson), cid in chapter_map.items():
chapter_reverse[cid] = (cl, cs, cu, clesson)
# Compute max progress per account
for aid in uid_set:
char_ids = uid_to_chars.get(aid, [])
max_unit_idx = -1
max_lesson_idx = -1
max_progress = ""
for cid in char_ids:
for ch_id, dt in all_chapter_first.get(cid, {}).items():
if ch_id in chapter_reverse:
cl, cs, cu, clesson = chapter_reverse[ch_id]
# Parse unit index
if cu.startswith("U"):
try:
ui = int(cu[1:])
li = int(clesson[1:]) if clesson.startswith("L") else 0
if ui > max_unit_idx or (ui == max_unit_idx and li > max_lesson_idx):
max_unit_idx = ui
max_lesson_idx = li
max_progress = f"{cl}-{cs}-{cu}-{clesson}"
except:
pass
db[aid]["max_progress"] = max_progress
# 3i. Trial count before refund
log(" 查询退费前体验节数...")
for aid in uid_set:
if db[aid].get("first_refund_date"):
refund_dt = db[aid]["first_refund_date"]
# Count trial courses with created_at < refund_date
cur.execute(
"SELECT COUNT(*) FROM bi_user_course_detail WHERE account_id=%s AND expire_time IS NULL AND deleted_at IS NULL AND created_at < %s",
(aid, refund_dt)
)
db[aid]["trial_before_refund"] = cur.fetchone()[0]
else:
db[aid]["trial_before_refund"] = 0
# 3j. Max progress before refund
log(" 查询退费前最高进度...")
for aid in uid_set:
if db[aid].get("first_refund_date"):
refund_dt = db[aid]["first_refund_date"]
char_ids = uid_to_chars.get(aid, [])
max_ui = -1
max_li = -1
max_prog = ""
for cid in char_ids:
for ch_id, dt in all_chapter_first.get(cid, {}).items():
if ch_id in chapter_reverse and dt.strftime("%Y-%m-%d") <= refund_dt:
cl, cs, cu, clesson = chapter_reverse[ch_id]
try:
ui = int(cu[1:])
li = int(clesson[1:]) if clesson.startswith("L") else 0
if ui > max_ui or (ui == max_ui and li > max_li):
max_ui = ui
max_li = li
max_prog = f"{cl}-{cs}-{cu}-{clesson}"
except:
pass
db[aid]["max_progress_before_refund"] = max_prog
else:
db[aid]["max_progress_before_refund"] = ""
cur.close()
conn.close()
# Step 4: Build output rows
log("Step 4: 构建输出数据")
output_header = [
"行号", "笔记ID", "笔记标题", "笔记作者", "笔记类型",
"匹配状态", "流量类型", "归属账号",
"微伴客户名", "小红书昵称", "所属客服", "用户年级",
"用户ID", "手机号", "订单号",
"注册日期", "下载渠道",
"首单付费日期", "全部付费日期",
"下单金额(GMV)", "退款金额", "实际收入(GSV)",
"退费日期", "退费金额",
"U0完成日期", "U0付费后N天",
"U1完成日期", "U1付费后N天",
"退费前体验节数", "退费前最高进度",
"当前最高进度",
"激活课程", "激活日期",
"当前行课进度", "最近行课时间", "累计学习时长(min)",
"成交渠道", "产品", "渠道归属",
"有效订单", "匹配方式",
]
output_rows = []
for r in rows:
aid = int(r["uid"]) if r["uid"] else 0
di = db.get(aid, {})
first_pay = di.get("first_pay_date", "")
pay_dates = ", ".join(di.get("pay_dates", []))
u0_date = di.get("u0_complete_date", "")
u1_date = di.get("u1_complete_date", "")
# Days after payment
u0_days = ""
u1_days = ""
if first_pay and u0_date:
try:
d1 = datetime.strptime(first_pay, "%Y-%m-%d")
d2 = datetime.strptime(u0_date, "%Y-%m-%d")
u0_days = (d2 - d1).days
except:
pass
if first_pay and u1_date:
try:
d1 = datetime.strptime(first_pay, "%Y-%m-%d")
d2 = datetime.strptime(u1_date, "%Y-%m-%d")
u1_days = (d2 - d1).days
except:
pass
output_rows.append([
r["row"],
r["note_id"], r["note_title"], r["note_author"], r["note_type"],
r["match_status"], r["traffic_type"], r["account"],
r["customer_name"], r["xhs_nickname"], r["cs"], r["grade"],
r["uid"], r["phone"], r["order_no"],
di.get("reg_date", r["reg_date"]), di.get("download_channel", r["download_channel"]),
first_pay, pay_dates,
di.get("total_gmv", r["gmv"]),
di.get("total_refund", r["refund"]),
di.get("total_gmv", 0) - di.get("total_refund", 0),
di.get("first_refund_date", ""),
di.get("total_refund", 0),
u0_date, u0_days,
u1_date, u1_days,
di.get("trial_before_refund", 0),
di.get("max_progress_before_refund", ""),
di.get("max_progress", ""),
di.get("activation", r["activation"]),
di.get("activation_date", ""),
r["lesson_progress"], r["lesson_time"], r["lesson_minutes"],
r["order_channel"], r["product"], r["channel_attr"],
r["valid_order"], r["match_method"],
])
log(f" 输出 {len(output_rows)}")
# Step 5: Write to Sheet1 columns BA onwards
log("Step 5: 写入 Sheet1 右侧列 (BA起)")
# Columns: BA(53), BB(54), BC(55), BD(56), BE(57), BF(58), BG(59), BH(60), BI(61), BJ(62), BK(63), BL(64)
new_headers = [
"首单付费日期", "全部付费日期",
"退费日期", "退费金额(元)",
"U0完成日期", "U0付费后N天",
"U1完成日期", "U1付费后N天",
"退费前体验节数", "退费前最高进度",
"当前最高进度",
"激活日期",
]
def col_letter(n):
"""0-indexed column number to letter: 0->A, 25->Z, 26->AA, 52->BA"""
result = ""
n += 1
while n > 0:
n -= 1
result = chr(65 + n % 26) + result
n //= 26
return result
start_col_idx = 52 # BA = col 52 (0-indexed)
# Write headers to row 1
for i, h in enumerate(new_headers):
col = col_letter(start_col_idx + i)
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
body = {"valueRange": {"range": f"{SOURCE_SHEET}!{col}1:{col}1", "values": [[h]]}}
resp = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=15)
r = resp.json()
if r.get("code") != 0:
log(f" ⚠️ 写入表头 {col} 失败: {r}")
log(f" 表头写入完成: {col_letter(start_col_idx)}-{col_letter(start_col_idx + len(new_headers) - 1)}")
# Write data columns one by one using batch write
n_cols = len(new_headers)
# Build column data: each column is a list of [value] for each row
col_values = [[] for _ in range(n_cols)]
for row_data in output_rows:
# output_rows columns are: 行号(0), 笔记ID(1)... 首单付费日期(17), 全部付费日期(18),
# GMV(19), 退款(20), GSV(21), 退费日期(22), 退费金额(23),
# U0完成(24), U0天数(25), U1完成(26), U1天数(27),
# 退费前体验节数(28), 退费前最高进度(29), 当前最高进度(30),
# 激活课程(31), 激活日期(32), ...
col_values[0].append([row_data[17] if row_data[17] else ""]) # 首单付费日期
col_values[1].append([row_data[18] if row_data[18] else ""]) # 全部付费日期
col_values[2].append([row_data[22] if row_data[22] else ""]) # 退费日期
col_values[3].append([row_data[23] if row_data[23] else ""]) # 退费金额
col_values[4].append([row_data[24] if row_data[24] else ""]) # U0完成日期
col_values[5].append([row_data[25] if row_data[25] else ""]) # U0付费后N天
col_values[6].append([row_data[26] if row_data[26] else ""]) # U1完成日期
col_values[7].append([row_data[27] if row_data[27] else ""]) # U1付费后N天
col_values[8].append([row_data[28] if row_data[28] else ""]) # 退费前体验节数
col_values[9].append([row_data[29] if row_data[29] else ""]) # 退费前最高进度
col_values[10].append([row_data[30] if row_data[30] else ""]) # 当前最高进度
col_values[11].append([row_data[32] if row_data[32] else ""]) # 激活日期
# Write each column (batch by 4000 rows)
for i in range(n_cols):
col = col_letter(start_col_idx + i)
vals = col_values[i]
# Write in batches of 4000
for batch_start in range(0, len(vals), 4000):
batch = vals[batch_start:batch_start + 4000]
start_row = batch_start + 2
end_row = start_row + len(batch) - 1
url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values"
body = {"valueRange": {"range": f"{SOURCE_SHEET}!{col}{start_row}:{col}{end_row}", "values": batch}}
resp = requests.put(url, headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}, json=body, timeout=30)
r = resp.json()
if r.get("code") != 0:
log(f"{col}{start_row}:{col}{end_row}: {r}")
time.sleep(0.05)
log(f" {col} 列写入完成 ({len(vals)} 行)")
log(f"✅ 写入完成: {len(output_rows)} 行, {n_cols}")
log(f" 表格链接: https://makee-interactive.feishu.cn/sheets/{SPREADSHEET_TOKEN}")
if __name__ == "__main__":
main()