143 lines
5.4 KiB
Python
143 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
飞书表格安全写入工具 — 自动遵守 5000 格/次 API 上限
|
||
|
||
飞书 Open API 单次写入上限为 5000 格(行×列)。
|
||
超过上限的请求会静默失败(API 不报错但数据不完整),
|
||
导致旧数据残留、新数据被部分覆盖、末尾行丢失等问题。
|
||
|
||
本模块封装了安全的分批写入和清空逻辑,所有操作自动计算
|
||
批大小确保 ≤ 4400 格/批(留 12% 安全余量)。
|
||
|
||
用法:
|
||
from feishu_sheet_utils import FeishuSheetWriter
|
||
|
||
writer = FeishuSheetWriter(SPREADSHEET_TOKEN, token)
|
||
writer.clear(sheet_id, start_row=3, end_row=500, cols=26)
|
||
writer.write(sheet_id, start_row=3, rows=data, cols=26)
|
||
"""
|
||
|
||
import time
|
||
import requests
|
||
|
||
# 飞书 API 单次写入格数上限
|
||
FEISHU_CELL_LIMIT = 5000
|
||
# 安全余量系数(0.88,即实际使用 ≤ 4400 格/批)
|
||
SAFETY_FACTOR = 0.88
|
||
# 单批最大格数
|
||
SAFE_CELLS_PER_BATCH = int(FEISHU_CELL_LIMIT * SAFETY_FACTOR) # 4400
|
||
|
||
|
||
def max_rows_per_batch(cols):
|
||
"""根据列数计算单批最大行数(确保 ≤ 4400 格)。"""
|
||
return max(1, SAFE_CELLS_PER_BATCH // cols)
|
||
|
||
|
||
class FeishuSheetWriter:
|
||
"""飞书表格安全写入器,自动分批遵守 5000 格上限。"""
|
||
|
||
def __init__(self, spreadsheet_token, tenant_token):
|
||
self.spreadsheet_token = spreadsheet_token
|
||
self.token = tenant_token
|
||
self.base_url = "https://open.feishu.cn/open-apis/sheets/v2"
|
||
|
||
def _put(self, sheet_id, range_str, values, retries=3):
|
||
"""单次写入,含重试。"""
|
||
url = f"{self.base_url}/spreadsheets/{self.spreadsheet_token}/values"
|
||
body = {"valueRange": {"range": f"{sheet_id}!{range_str}", "values": values}}
|
||
for attempt in range(retries):
|
||
resp = requests.put(url, headers={
|
||
"Authorization": f"Bearer {self.token}",
|
||
"Content-Type": "application/json"
|
||
}, json=body, timeout=30)
|
||
result = resp.json()
|
||
if result.get("code") == 0:
|
||
return True
|
||
print(f" Retry {attempt+1} for {range_str}: {result.get('msg','')}")
|
||
time.sleep(1)
|
||
print(f" FAILED {range_str}")
|
||
return False
|
||
|
||
def _col_letter(self, idx):
|
||
"""0-based column index → Excel column letter(s). 0→A, 25→Z, 26→AA."""
|
||
result = ""
|
||
n = idx
|
||
while n >= 0:
|
||
result = chr(ord('A') + n % 26) + result
|
||
n = n // 26 - 1
|
||
return result
|
||
|
||
def _range_str(self, start_row, end_row, cols):
|
||
"""生成范围字符串,如 A3:Z52。"""
|
||
end_col = self._col_letter(cols - 1)
|
||
return f"A{start_row}:{end_col}{end_row}"
|
||
|
||
def clear(self, sheet_id, start_row, end_row, cols):
|
||
"""
|
||
安全清空指定区域(写入空字符串)。
|
||
自动分批,每批 ≤ 4400 格。
|
||
"""
|
||
if end_row < start_row:
|
||
return
|
||
batch_rows = max_rows_per_batch(cols)
|
||
total = end_row - start_row + 1
|
||
print(f" Clearing {sheet_id} rows {start_row}-{end_row} "
|
||
f"({total} rows × {cols} cols, batch={batch_rows} rows)")
|
||
|
||
for batch_start in range(start_row, end_row + 1, batch_rows):
|
||
batch_end = min(batch_start + batch_rows - 1, end_row)
|
||
n_rows = batch_end - batch_start + 1
|
||
empty = [[""] * cols for _ in range(n_rows)]
|
||
rng = self._range_str(batch_start, batch_end, cols)
|
||
ok = self._put(sheet_id, rng, empty)
|
||
if not ok:
|
||
print(f" Clear batch {rng} failed, continuing...")
|
||
time.sleep(0.15)
|
||
|
||
def write(self, sheet_id, start_row, rows, cols):
|
||
"""
|
||
安全写入数据行。
|
||
自动分批,每批 ≤ 4400 格。
|
||
rows: list of list,每行长度应为 cols。
|
||
"""
|
||
if not rows:
|
||
return
|
||
batch_rows = max_rows_per_batch(cols)
|
||
total = len(rows)
|
||
print(f" Writing {sheet_id} {total} rows × {cols} cols "
|
||
f"(batch={batch_rows} rows, {batch_rows * cols} cells/batch)")
|
||
|
||
for batch_start in range(0, total, batch_rows):
|
||
batch = rows[batch_start:batch_start + batch_rows]
|
||
sr = start_row + batch_start
|
||
er = sr + len(batch) - 1
|
||
rng = self._range_str(sr, er, cols)
|
||
ok = self._put(sheet_id, rng, batch)
|
||
if not ok:
|
||
print(f" Write batch {rng} failed!")
|
||
time.sleep(0.3)
|
||
|
||
def clear_excess(self, sheet_id, total_written, old_count, cols):
|
||
"""清除超出新数据范围的旧行残留。"""
|
||
if old_count <= total_written:
|
||
return
|
||
clear_start = start_row_base = 3 # 假设数据从第3行开始
|
||
actual_start = clear_start + total_written
|
||
actual_end = clear_start + old_count - 1
|
||
if actual_start > actual_end:
|
||
return
|
||
print(f" Clearing excess rows {actual_start}-{actual_end}")
|
||
self.clear(sheet_id, actual_start, actual_end, cols)
|
||
|
||
|
||
def safe_clear_range(token, spreadsheet_token, sheet_id, start_row, end_row, cols):
|
||
"""便捷函数:安全清空指定区域。"""
|
||
writer = FeishuSheetWriter(spreadsheet_token, token)
|
||
writer.clear(sheet_id, start_row, end_row, cols)
|
||
|
||
|
||
def safe_write_rows(token, spreadsheet_token, sheet_id, start_row, rows, cols):
|
||
"""便捷函数:安全写入数据行。"""
|
||
writer = FeishuSheetWriter(spreadsheet_token, token)
|
||
writer.write(sheet_id, start_row, rows, cols)
|