#!/usr/bin/env python3 """ 企微线索分析·补数脚本 从「线索与企微用户对应」表读取数据,通过用户ID/手机号匹配DB, 补充学情深度字段,写入同一表格的新 sheet。 需要字段: 1. 付费日期 2. U0完成日期、U1完成日期、付费后N天完成 3. 退费日期、退费金额 4. 退费前体验节数、退费前最高单元/课程进度 5. 注册日期、首课/首单元激活日期 """ import json, re, sys, os, time, requests, psycopg2 from datetime import datetime, date from collections import defaultdict SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) WORKSPACE = os.path.dirname(SCRIPTS_DIR) CRED_DIR = "/root/.openclaw/credentials/xiaoxi" sys.path.insert(0, SCRIPTS_DIR) from phone_encrypt import encrypt_phone from feishu_sheet_utils import FeishuSheetWriter SPREADSHEET_TOKEN = "RSlMsdRWqhaRrftlnhUcpVTjnOd" SOURCE_SHEET = "9c7ffe" OUTPUT_SHEET = "学情补数" LOG_FILE = "/var/log/xiaoxi_wechat_leads.log" def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a") as f: f.write(line + "\n") def get_secret(key): with open(os.path.join(WORKSPACE, "secrets.env")) as f: for line in f: if line.startswith(f"{key}="): return line.strip().split("=", 1)[1].strip("'\"") def get_fs_token(): with open(os.path.join(CRED_DIR, "config.json")) as f: cfg = json.load(f) resp = requests.post( "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal", json={"app_id": cfg["apps"][0]["appId"], "app_secret": cfg["apps"][0]["appSecret"]}, timeout=15 ) return resp.json()["tenant_access_token"] def read_col(token, sheet_id, col_letter, max_rows=5000): url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values/{sheet_id}!{col_letter}2:{col_letter}{max_rows}" resp = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=30) data = resp.json() if data.get("code") != 0: raise RuntimeError(f"读取失败 {col_letter}: {data}") vals = data["data"]["valueRange"]["values"] return [v[0] if v else "" for v in vals] def safe_str(v): if v is None: return "" if isinstance(v, (int, float)): if v == int(v): return str(int(v)) return str(v) return str(v).strip() def batch_in(cur, sql_tpl, params, chunk=500): results = [] for i in range(0, len(params), chunk): batch = params[i:i + chunk] ph = ",".join(["%s"] * len(batch)) cur.execute(sql_tpl % ph, batch) results.extend(cur.fetchall()) return results def parse_sheet_date(v): """解析飞书表格中的日期: 数字(Excel序列号) 或 '6月12日' 格式""" if not v: return None s = safe_str(v) # Excel serial number try: n = float(s) if 45000 < n < 50000: from datetime import timedelta return datetime(1899, 12, 30) + timedelta(days=int(n)) except: pass # "6月12日" format m = re.match(r'^(\d{1,2})月(\d{1,2})日', s) if m: return datetime(2026, int(m.group(1)), int(m.group(2))) # "2026-01-07" format m2 = re.match(r'^(\d{4})-(\d{2})-(\d{2})', s) if m2: return datetime(int(m2.group(1)), int(m2.group(2)), int(m2.group(3))) return None # ═══ Main ═══ def main(): log("=" * 60) log("企微线索分析·补数 启动") token = get_fs_token() # Step 1: Read source sheet log("Step 1: 读取源表数据") cols = { "A": "进线日期", "C": "微伴客户名", "D": "小红书昵称", "F": "所属客服", "M": "笔记标题", "N": "笔记作者", "O": "笔记类型", "Q": "笔记ID", "R": "匹配状态", "S": "流量类型", "T": "归属账号", "AC": "手机号", "AD": "用户年级", "AF": "用户ID", "AG": "注册日期", "AH": "下载渠道", "AI": "下单日期", "AJ": "成交渠道", "AK": "产品", "AL": "下单金额(GMV)", "AM": "退款金额", "AN": "实际收入(GSV)", "AO": "激活课程", "AP": "当前行课进度", "AQ": "最近行课时间", "AR": "累计学习时长(min)", "AV": "订单号", "AW": "有效订单", "AX": "渠道归属", "AY": "匹配方式", } col_data = {} for col_letter, col_name in cols.items(): col_data[col_letter] = read_col(token, SOURCE_SHEET, col_letter, 4700) n_rows = len(col_data["A"]) log(f" 读取 {n_rows} 行") # Build row list rows = [] for i in range(n_rows): uid_raw = safe_str(col_data["AF"][i]) if i < len(col_data["AF"]) else "" phone_raw = safe_str(col_data["AC"][i]) if i < len(col_data["AC"]) else "" order_raw = safe_str(col_data["AV"][i]) if i < len(col_data["AV"]) else "" uid = uid_raw if uid_raw.isdigit() else "" phone = "" if re.match(r'^\d{11}$', phone_raw): phone = phone_raw elif phone_raw: try: p = str(int(float(phone_raw))) if re.match(r'^\d{11}$', p): phone = p except: pass rows.append({ "row": i + 2, "uid": uid, "phone": phone, "order_no": order_raw, "note_id": safe_str(col_data["Q"][i]) if i < len(col_data["Q"]) else "", "note_title": safe_str(col_data["M"][i]) if i < len(col_data["M"]) else "", "note_author": safe_str(col_data["N"][i]) if i < len(col_data["N"]) else "", "note_type": safe_str(col_data["O"][i]) if i < len(col_data["O"]) else "", "match_status": safe_str(col_data["R"][i]) if i < len(col_data["R"]) else "", "traffic_type": safe_str(col_data["S"][i]) if i < len(col_data["S"]) else "", "account": safe_str(col_data["T"][i]) if i < len(col_data["T"]) else "", "customer_name": safe_str(col_data["C"][i]) if i < len(col_data["C"]) else "", "xhs_nickname": safe_str(col_data["D"][i]) if i < len(col_data["D"]) else "", "cs": safe_str(col_data["F"][i]) if i < len(col_data["F"]) else "", "grade": safe_str(col_data["AD"][i]) if i < len(col_data["AD"]) else "", "reg_date": safe_str(col_data["AG"][i]) if i < len(col_data["AG"]) else "", "download_channel": safe_str(col_data["AH"][i]) if i < len(col_data["AH"]) else "", "order_date": safe_str(col_data["AI"][i]) if i < len(col_data["AI"]) else "", "order_channel": safe_str(col_data["AJ"][i]) if i < len(col_data["AJ"]) else "", "product": safe_str(col_data["AK"][i]) if i < len(col_data["AK"]) else "", "gmv": safe_str(col_data["AL"][i]) if i < len(col_data["AL"]) else "", "refund": safe_str(col_data["AM"][i]) if i < len(col_data["AM"]) else "", "gsv": safe_str(col_data["AN"][i]) if i < len(col_data["AN"]) else "", "activation": safe_str(col_data["AO"][i]) if i < len(col_data["AO"]) else "", "lesson_progress": safe_str(col_data["AP"][i]) if i < len(col_data["AP"]) else "", "lesson_time": safe_str(col_data["AQ"][i]) if i < len(col_data["AQ"]) else "", "lesson_minutes": safe_str(col_data["AR"][i]) if i < len(col_data["AR"]) else "", "valid_order": safe_str(col_data["AW"][i]) if i < len(col_data["AW"]) else "", "channel_attr": safe_str(col_data["AX"][i]) if i < len(col_data["AX"]) else "", "match_method": safe_str(col_data["AY"][i]) if i < len(col_data["AY"]) else "", }) # Step 2: Phone → UID matching for rows without UID log("Step 2: 手机号 XXTEA 匹配") phone_set = set(r["phone"] for r in rows if r["phone"] and not r["uid"]) log(f" 待匹配手机号: {len(phone_set)}") phone_enc_map = {} for p in phone_set: try: phone_enc_map[encrypt_phone(p)] = p except Exception as ex: log(f" 加密失败 {p}: {ex}") conn = psycopg2.connect( host="bj-postgres-16pob4sg.sql.tencentcdb.com", port=28591, user="ai_member", password=get_secret("PG_ONLINE_PASSWORD"), dbname="vala_bi", connect_timeout=30 ) cur = conn.cursor() phone_to_uid = {} if phone_enc_map: enc_list = list(phone_enc_map.keys()) for i in range(0, len(enc_list), 500): chunk = enc_list[i:i + 500] ph = ",".join(["%s"] * len(chunk)) cur.execute( f"SELECT id, tel_encrypt FROM bi_vala_app_account WHERE tel_encrypt IN ({ph}) AND status=1 AND deleted_at IS NULL", chunk ) for uid, tel_enc in cur.fetchall(): plain = phone_enc_map.get(tel_enc) if plain: phone_to_uid[plain] = str(uid) # Fill missing UIDs filled = 0 for r in rows: if not r["uid"] and r["phone"] in phone_to_uid: r["uid"] = phone_to_uid[r["phone"]] filled += 1 log(f" 手机号匹配补充 UID: {filled}") # Collect all UIDs uid_set = set() for r in rows: if r["uid"]: uid_set.add(int(r["uid"])) uid_list = list(uid_set) log(f" 唯一用户ID: {len(uid_list)}") # Step 3: Query all DB data log("Step 3: 数据库批量查询") db = {uid: {} for uid in uid_set} # 3a. Registration info log(" 查询注册信息...") reg_info = batch_in(cur, "SELECT id, created_at, download_channel FROM bi_vala_app_account WHERE id IN (%s) AND status=1 AND deleted_at IS NULL", uid_list ) for aid, created_at, dc in reg_info: db[aid]["reg_date"] = created_at.strftime("%Y-%m-%d") if created_at else "" db[aid]["download_channel"] = dc or "" # 3b. Orders log(" 查询订单信息...") orders = batch_in(cur, "SELECT account_id, trade_no, pay_success_date, key_from, goods_id, pay_amount_int, order_status " "FROM bi_vala_order WHERE account_id IN (%s) AND pay_success_date IS NOT NULL AND order_status IN (3,4) " "ORDER BY pay_success_date", uid_list ) user_orders = defaultdict(list) for o in orders: user_orders[o[0]].append(o) trade_nos = [o[1] for o in orders if o[1]] refund_map = {} if trade_nos: refunds = batch_in(cur, "SELECT trade_no, refund_amount_int, created_at FROM bi_refund_order WHERE trade_no IN (%s) AND status=3", trade_nos ) for tn, amt, created_at in refunds: refund_map[tn] = (amt, created_at) for aid, olist in user_orders.items(): # First order date first_order = min(olist, key=lambda x: x[2]) db[aid]["first_pay_date"] = first_order[2].strftime("%Y-%m-%d") if first_order[2] else "" db[aid]["first_pay_dt"] = first_order[2] # All pay dates db[aid]["pay_dates"] = sorted(set(o[2].strftime("%Y-%m-%d") for o in olist if o[2])) # Refund info total_refund = 0 refund_dates = [] for o in olist: if o[1] in refund_map: amt, rdate = refund_map[o[1]] total_refund += amt if rdate: refund_dates.append(rdate.strftime("%Y-%m-%d")) db[aid]["total_refund"] = total_refund / 100.0 db[aid]["refund_dates"] = sorted(set(refund_dates)) db[aid]["first_refund_date"] = refund_dates[0] if refund_dates else "" # Total GMV db[aid]["total_gmv"] = sum(o[5] for o in olist) / 100.0 # 3c. Trial courses (体验课) log(" 查询体验课...") trial_info = batch_in(cur, "SELECT account_id, COUNT(*) FROM bi_user_course_detail WHERE account_id IN (%s) AND expire_time IS NULL AND deleted_at IS NULL GROUP BY account_id", uid_list ) for aid, cnt in trial_info: db[aid]["trial_count"] = cnt # 3d. Activation (seasonal tickets) log(" 查询激活课程...") tickets = batch_in(cur, "SELECT account_id, season_package_level, created_at FROM bi_vala_seasonal_ticket " "WHERE account_id IN (%s) AND status=1 AND deleted_at IS NULL ORDER BY created_at", uid_list ) for aid, level, created_at in tickets: if "activation" not in db[aid]: db[aid]["activation"] = level db[aid]["activation_date"] = created_at.strftime("%Y-%m-%d") if created_at else "" elif "activation2" not in db[aid]: db[aid]["activation2"] = level db[aid]["activation_date2"] = created_at.strftime("%Y-%m-%d") if created_at else "" # 3e. Chapter structure for U0/U1 log(" 查询课程结构...") cur.execute( "SELECT id, course_level, course_season, course_unit, course_lesson FROM bi_level_unit_lesson" ) chapter_map = {} # (level, season, unit, lesson) -> chapter_id unit_chapters = defaultdict(list) # (level, season, unit) -> [chapter_ids] for cid, cl, cs, cu, clesson in cur.fetchall(): chapter_map[(cl, cs, cu, clesson)] = cid unit_chapters[(cl, cs, cu)].append(cid) # U0 = L1 S0 U00, U1 = L1 S0 U01 u0_chapters = unit_chapters.get(("L1", "S0", "U00"), []) u1_chapters = unit_chapters.get(("L1", "S0", "U01"), []) # Also L2 versions u0_chapters_l2 = unit_chapters.get(("L2", "S0", "U00"), []) u1_chapters_l2 = unit_chapters.get(("L2", "S0", "U01"), []) all_u0 = set(u0_chapters + u0_chapters_l2) all_u1 = set(u1_chapters + u1_chapters_l2) # 3f. Character IDs log(" 查询角色信息...") chars = batch_in(cur, "SELECT account_id, id FROM bi_vala_app_character WHERE account_id IN (%s) AND deleted_at IS NULL", uid_list ) uid_to_chars = defaultdict(list) for aid, cid in chars: uid_to_chars[aid].append(cid) all_char_ids = [c for chars_list in uid_to_chars.values() for c in chars_list] log(f" 角色数: {len(all_char_ids)}") # 3g. Chapter play records for U0/U1 completion log(" 查询U0/U1完成记录...") all_chapters = list(all_u0 | all_u1) u0u1_first = defaultdict(dict) # user_id -> {chapter_id: first_completion_date} tables = [f"bi_user_chapter_play_record_{i}" for i in range(8)] for table in tables: try: for i in range(0, len(all_char_ids), 200): chunk_users = all_char_ids[i:i+200] cur.execute( f"SELECT user_id, chapter_id, MIN(created_at) FROM {table} " f"WHERE user_id = ANY(%s) AND play_status=1 AND deleted_at IS NULL AND chapter_id = ANY(%s) " f"GROUP BY user_id, chapter_id", (chunk_users, all_chapters) ) for user_id, chapter_id, first_dt in cur.fetchall(): u0u1_first[user_id][chapter_id] = first_dt except Exception as e: log(f" {table}: {e}") conn.rollback() # Compute U0/U1 completion per account for aid in uid_set: char_ids = uid_to_chars.get(aid, []) u0_dates = [] u1_dates = [] for cid in char_ids: for ch_id, dt in u0u1_first.get(cid, {}).items(): if ch_id in all_u0: u0_dates.append(dt) if ch_id in all_u1: u1_dates.append(dt) # U0 completion = all U0 chapters completed (5 lessons) # Use the max date among U0 chapters as completion date if u0_dates: db[aid]["u0_complete_date"] = max(u0_dates).strftime("%Y-%m-%d") if u1_dates: db[aid]["u1_complete_date"] = max(u1_dates).strftime("%Y-%m-%d") # 3h. All chapter completions for max progress log(" 查询全部课时完成记录...") all_chapter_first = defaultdict(dict) # user_id -> {chapter_id: first_date} for table in tables: try: for i in range(0, len(all_char_ids), 200): chunk_users = all_char_ids[i:i+200] cur.execute( f"SELECT user_id, chapter_id, MIN(created_at) FROM {table} " f"WHERE user_id = ANY(%s) AND play_status=1 AND deleted_at IS NULL " f"GROUP BY user_id, chapter_id", (chunk_users,) ) for user_id, chapter_id, first_dt in cur.fetchall(): all_chapter_first[user_id][chapter_id] = first_dt except Exception as e: log(f" {table}: {e}") conn.rollback() # Build chapter_id -> (level, season, unit, lesson) reverse map chapter_reverse = {} for (cl, cs, cu, clesson), cid in chapter_map.items(): chapter_reverse[cid] = (cl, cs, cu, clesson) # Compute max progress per account for aid in uid_set: char_ids = uid_to_chars.get(aid, []) max_unit_idx = -1 max_lesson_idx = -1 max_progress = "" for cid in char_ids: for ch_id, dt in all_chapter_first.get(cid, {}).items(): if ch_id in chapter_reverse: cl, cs, cu, clesson = chapter_reverse[ch_id] # Parse unit index if cu.startswith("U"): try: ui = int(cu[1:]) li = int(clesson[1:]) if clesson.startswith("L") else 0 if ui > max_unit_idx or (ui == max_unit_idx and li > max_lesson_idx): max_unit_idx = ui max_lesson_idx = li max_progress = f"{cl}-{cs}-{cu}-{clesson}" except: pass db[aid]["max_progress"] = max_progress # 3i. Trial count before refund log(" 查询退费前体验节数...") for aid in uid_set: if db[aid].get("first_refund_date"): refund_dt = db[aid]["first_refund_date"] # Count trial courses with created_at < refund_date cur.execute( "SELECT COUNT(*) FROM bi_user_course_detail WHERE account_id=%s AND expire_time IS NULL AND deleted_at IS NULL AND created_at < %s", (aid, refund_dt) ) db[aid]["trial_before_refund"] = cur.fetchone()[0] else: db[aid]["trial_before_refund"] = 0 # 3j. Max progress before refund log(" 查询退费前最高进度...") for aid in uid_set: if db[aid].get("first_refund_date"): refund_dt = db[aid]["first_refund_date"] char_ids = uid_to_chars.get(aid, []) max_ui = -1 max_li = -1 max_prog = "" for cid in char_ids: for ch_id, dt in all_chapter_first.get(cid, {}).items(): if ch_id in chapter_reverse and dt.strftime("%Y-%m-%d") <= refund_dt: cl, cs, cu, clesson = chapter_reverse[ch_id] try: ui = int(cu[1:]) li = int(clesson[1:]) if clesson.startswith("L") else 0 if ui > max_ui or (ui == max_ui and li > max_li): max_ui = ui max_li = li max_prog = f"{cl}-{cs}-{cu}-{clesson}" except: pass db[aid]["max_progress_before_refund"] = max_prog else: db[aid]["max_progress_before_refund"] = "" cur.close() conn.close() # Step 4: Build output rows log("Step 4: 构建输出数据") output_header = [ "行号", "笔记ID", "笔记标题", "笔记作者", "笔记类型", "匹配状态", "流量类型", "归属账号", "微伴客户名", "小红书昵称", "所属客服", "用户年级", "用户ID", "手机号", "订单号", "注册日期", "下载渠道", "首单付费日期", "全部付费日期", "下单金额(GMV)", "退款金额", "实际收入(GSV)", "退费日期", "退费金额", "U0完成日期", "U0付费后N天", "U1完成日期", "U1付费后N天", "退费前体验节数", "退费前最高进度", "当前最高进度", "激活课程", "激活日期", "当前行课进度", "最近行课时间", "累计学习时长(min)", "成交渠道", "产品", "渠道归属", "有效订单", "匹配方式", ] output_rows = [] for r in rows: aid = int(r["uid"]) if r["uid"] else 0 di = db.get(aid, {}) first_pay = di.get("first_pay_date", "") pay_dates = ", ".join(di.get("pay_dates", [])) u0_date = di.get("u0_complete_date", "") u1_date = di.get("u1_complete_date", "") # Days after payment u0_days = "" u1_days = "" if first_pay and u0_date: try: d1 = datetime.strptime(first_pay, "%Y-%m-%d") d2 = datetime.strptime(u0_date, "%Y-%m-%d") u0_days = (d2 - d1).days except: pass if first_pay and u1_date: try: d1 = datetime.strptime(first_pay, "%Y-%m-%d") d2 = datetime.strptime(u1_date, "%Y-%m-%d") u1_days = (d2 - d1).days except: pass output_rows.append([ r["row"], r["note_id"], r["note_title"], r["note_author"], r["note_type"], r["match_status"], r["traffic_type"], r["account"], r["customer_name"], r["xhs_nickname"], r["cs"], r["grade"], r["uid"], r["phone"], r["order_no"], di.get("reg_date", r["reg_date"]), di.get("download_channel", r["download_channel"]), first_pay, pay_dates, di.get("total_gmv", r["gmv"]), di.get("total_refund", r["refund"]), di.get("total_gmv", 0) - di.get("total_refund", 0), di.get("first_refund_date", ""), di.get("total_refund", 0), u0_date, u0_days, u1_date, u1_days, di.get("trial_before_refund", 0), di.get("max_progress_before_refund", ""), di.get("max_progress", ""), di.get("activation", r["activation"]), di.get("activation_date", ""), r["lesson_progress"], r["lesson_time"], r["lesson_minutes"], r["order_channel"], r["product"], r["channel_attr"], r["valid_order"], r["match_method"], ]) log(f" 输出 {len(output_rows)} 行") # Step 5: Write to Sheet1 columns BA onwards log("Step 5: 写入 Sheet1 右侧列 (BA起)") # Columns: BA(53), BB(54), BC(55), BD(56), BE(57), BF(58), BG(59), BH(60), BI(61), BJ(62), BK(63), BL(64) new_headers = [ "首单付费日期", "全部付费日期", "退费日期", "退费金额(元)", "U0完成日期", "U0付费后N天", "U1完成日期", "U1付费后N天", "退费前体验节数", "退费前最高进度", "当前最高进度", "激活日期", ] def col_letter(n): """0-indexed column number to letter: 0->A, 25->Z, 26->AA, 52->BA""" result = "" n += 1 while n > 0: n -= 1 result = chr(65 + n % 26) + result n //= 26 return result start_col_idx = 52 # BA = col 52 (0-indexed) # Write headers to row 1 for i, h in enumerate(new_headers): col = col_letter(start_col_idx + i) url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{SOURCE_SHEET}!{col}1:{col}1", "values": [[h]]}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=15) r = resp.json() if r.get("code") != 0: log(f" ⚠️ 写入表头 {col} 失败: {r}") log(f" 表头写入完成: {col_letter(start_col_idx)}-{col_letter(start_col_idx + len(new_headers) - 1)}") # Write data columns one by one using batch write n_cols = len(new_headers) # Build column data: each column is a list of [value] for each row col_values = [[] for _ in range(n_cols)] for row_data in output_rows: # output_rows columns are: 行号(0), 笔记ID(1)... 首单付费日期(17), 全部付费日期(18), # GMV(19), 退款(20), GSV(21), 退费日期(22), 退费金额(23), # U0完成(24), U0天数(25), U1完成(26), U1天数(27), # 退费前体验节数(28), 退费前最高进度(29), 当前最高进度(30), # 激活课程(31), 激活日期(32), ... col_values[0].append([row_data[17] if row_data[17] else ""]) # 首单付费日期 col_values[1].append([row_data[18] if row_data[18] else ""]) # 全部付费日期 col_values[2].append([row_data[22] if row_data[22] else ""]) # 退费日期 col_values[3].append([row_data[23] if row_data[23] else ""]) # 退费金额 col_values[4].append([row_data[24] if row_data[24] else ""]) # U0完成日期 col_values[5].append([row_data[25] if row_data[25] else ""]) # U0付费后N天 col_values[6].append([row_data[26] if row_data[26] else ""]) # U1完成日期 col_values[7].append([row_data[27] if row_data[27] else ""]) # U1付费后N天 col_values[8].append([row_data[28] if row_data[28] else ""]) # 退费前体验节数 col_values[9].append([row_data[29] if row_data[29] else ""]) # 退费前最高进度 col_values[10].append([row_data[30] if row_data[30] else ""]) # 当前最高进度 col_values[11].append([row_data[32] if row_data[32] else ""]) # 激活日期 # Write each column (batch by 4000 rows) for i in range(n_cols): col = col_letter(start_col_idx + i) vals = col_values[i] # Write in batches of 4000 for batch_start in range(0, len(vals), 4000): batch = vals[batch_start:batch_start + 4000] start_row = batch_start + 2 end_row = start_row + len(batch) - 1 url = f"https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{SPREADSHEET_TOKEN}/values" body = {"valueRange": {"range": f"{SOURCE_SHEET}!{col}{start_row}:{col}{end_row}", "values": batch}} resp = requests.put(url, headers={ "Authorization": f"Bearer {token}", "Content-Type": "application/json" }, json=body, timeout=30) r = resp.json() if r.get("code") != 0: log(f" ❌ {col}{start_row}:{col}{end_row}: {r}") time.sleep(0.05) log(f" {col} 列写入完成 ({len(vals)} 行)") log(f"✅ 写入完成: {len(output_rows)} 行, {n_cols} 列") log(f" 表格链接: https://makee-interactive.feishu.cn/sheets/{SPREADSHEET_TOKEN}") if __name__ == "__main__": main()