feishu-group-msg-sync.xiaokui/scripts/sync_group_to_sheet.py
2026-04-10 19:53:57 +08:00

284 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
飞书群聊消息同步到电子表格
- Bot身份拉取群消息
- 非文本媒体上传到腾讯COS
- 记录写入飞书电子表格
使用前修改下方配置常量。
"""
import os
import sys
import json
import subprocess
import logging
import re
from datetime import datetime, timezone, timedelta
from pathlib import Path
# ============ 配置(使用前必须修改)============
# 飞书群
CHAT_ID = "oc_xxx" # 目标群ID
# 飞书电子表格
SPREADSHEET_TOKEN = "xxx" # 电子表格token
SHEET_ID = "xxx" # sheet页ID
# Bot凭证
LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaokui" # Bot凭证目录
# 同步状态
LAST_SYNC_FILE = "/tmp/last_feedback_sync_time" # 同步时间记录文件
WORK_DIR = "/tmp/feedback_sync_workdir" # 临时工作目录
# COS从 tencent-cos-upload skill 引用)
COS_BASE_PATH = "vala_llm/user_feedback" # COS上的基础路径
# 时区
TZ = timezone(timedelta(hours=8))
# ============ 配置结束 ============
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger(__name__)
# 引入COS上传器
sys.path.insert(0, '/root/.openclaw/skills/tencent-cos-upload/scripts')
from cos_upload import CosUploader
def lark_cli(*args, cwd=None):
"""调用lark-cli并返回解析后的JSON"""
env = os.environ.copy()
env['LARKSUITE_CLI_CONFIG_DIR'] = LARK_CLI_CONFIG
cmd = ['lark-cli'] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, env=env, cwd=cwd or WORK_DIR)
if result.returncode != 0 and not result.stdout:
log.error(f"lark-cli error: {result.stderr}")
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
log.error(f"lark-cli JSON parse error: {result.stdout[:200]}")
return None
def fetch_messages(start_iso: str, end_iso: str) -> list:
"""拉取群消息"""
result = lark_cli(
'im', '+chat-messages-list',
'--chat-id', CHAT_ID,
'--start', start_iso,
'--end', end_iso,
'--sort', 'asc',
'--page-size', '50',
'--as', 'bot'
)
if not result or not result.get('ok'):
err = result.get('error', {}).get('message', 'unknown') if result else 'no response'
log.error(f"拉取消息失败: {err}")
return []
return result.get('data', {}).get('messages', [])
def download_resource(msg_id: str, file_key: str, res_type: str, filename: str) -> str:
"""下载飞书消息中的资源,返回本地路径"""
result = lark_cli(
'im', '+messages-resources-download',
'--message-id', msg_id,
'--file-key', file_key,
'--type', res_type,
'--output', filename,
'--as', 'bot',
cwd=WORK_DIR
)
local_path = os.path.join(WORK_DIR, filename)
if result and result.get('ok') and os.path.exists(local_path) and os.path.getsize(local_path) > 0:
return local_path
return None
def append_to_sheet(rows: list):
"""追加行到电子表格"""
if not rows:
return
values_json = json.dumps(rows, ensure_ascii=False)
result = lark_cli(
'sheets', '+append',
'--spreadsheet-token', SPREADSHEET_TOKEN,
'--sheet-id', SHEET_ID,
'--range', f'{SHEET_ID}!A:D',
'--values', values_json,
'--as', 'bot'
)
if result and result.get('ok'):
log.info(f"电子表格写入成功: {len(rows)}")
else:
err = result.get('error', {}).get('message', 'unknown') if result else 'no response'
log.error(f"电子表格写入失败: {err}")
def extract_file_key(msg_type: str, content: str) -> tuple:
"""从消息内容中提取file_key和资源类型"""
if msg_type == 'image':
match = re.search(r'img_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'image') if match else (None, None)
elif msg_type in ('media', 'audio'):
match = re.search(r'file_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'file') if match else (None, None)
elif msg_type == 'file':
try:
c = json.loads(content)
return (c.get('file_key'), 'file')
except Exception:
match = re.search(r'file_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'file') if match else (None, None)
return None, None
def get_media_info(msg_type: str, content: str) -> tuple:
"""返回 (cos子目录, 文件扩展名, content_type)"""
if msg_type == 'image':
return 'image', '.png', 'image/png'
elif msg_type == 'media':
name_match = re.search(r'name="([^"]*)"', content)
ext = os.path.splitext(name_match.group(1))[1] if name_match else '.mp4'
return 'video', ext or '.mp4', 'video/mp4'
elif msg_type == 'audio':
return 'audio', '.ogg', 'audio/ogg'
elif msg_type == 'file':
try:
c = json.loads(content)
ext = os.path.splitext(c.get('file_name', ''))[1]
except Exception:
ext = ''
return 'file', ext or '.bin', 'application/octet-stream'
return 'other', '', 'application/octet-stream'
def process_message(msg: dict, cos_uploader, date_str: str) -> list:
"""处理单条消息,返回表格行 [时间, 反馈人, 类型, 内容/URL] 或 None"""
msg_id = msg.get('message_id', '')
sender_name = msg.get('sender', {}).get('name', '未知')
create_time = msg.get('create_time', '')
msg_type = msg.get('msg_type', '')
content = msg.get('content', '')
deleted = msg.get('deleted', False)
if deleted or msg_type == 'system':
return None
# 文本消息
if msg_type in ('text', 'post'):
text = re.sub(r'<[^>]*>', '', content).strip()
return [create_time, sender_name, '文本', text]
# 表情包
if msg_type == 'sticker':
return [create_time, sender_name, '表情', '(表情包)']
# 媒体消息
cos_subdir, ext, content_type = get_media_info(msg_type, content)
file_key, res_type = extract_file_key(msg_type, content)
type_labels = {'image': '图片', 'media': '视频', 'audio': '语音', 'file': '文件'}
type_label = type_labels.get(msg_type, msg_type)
if msg_type == 'media':
dur = re.search(r'duration="([^"]*)"', content)
if dur:
type_label += f'({dur.group(1)})'
if not file_key:
return [create_time, sender_name, type_label, f'无法提取资源key']
# 文件名纯ASCII
short_id = msg_id[-12:]
if msg_type == 'media':
name_match = re.search(r'name="([^"]*)"', content)
orig_ext = os.path.splitext(name_match.group(1))[1] if name_match else ext
filename = f'{short_id}{orig_ext or ext}'
elif msg_type == 'file':
try:
c = json.loads(content)
orig_ext = os.path.splitext(c.get('file_name', ''))[1]
except Exception:
orig_ext = ext
filename = f'{short_id}{orig_ext or ext}'
else:
filename = f'{short_id}{ext}'
cos_key = f'{COS_BASE_PATH}/{cos_subdir}/{date_str}/{filename}'
# 下载
local_path = download_resource(msg_id, file_key, res_type, filename)
if not local_path:
log.warning(f"资源下载失败: {msg_id} ({msg_type})")
return [create_time, sender_name, type_label, '(下载失败)']
# 上传COS
try:
url = cos_uploader.upload(local_path, cos_key, content_type)
log.info(f"COS上传成功: {cos_key}")
except Exception as e:
log.error(f"COS上传失败: {cos_key} - {e}")
url = f'(上传失败)'
finally:
try:
os.remove(local_path)
except Exception:
pass
return [create_time, sender_name, type_label, url]
def main():
os.makedirs(WORK_DIR, exist_ok=True)
# 读取上次同步时间
if os.path.exists(LAST_SYNC_FILE):
with open(LAST_SYNC_FILE) as f:
last_sync_iso = f.read().strip()
else:
last_sync_iso = (datetime.now(TZ) - timedelta(hours=1)).isoformat()
current_iso = datetime.now(TZ).isoformat()
log.info(f"开始同步: {last_sync_iso} -> {current_iso}")
# 拉取消息
messages = fetch_messages(last_sync_iso, current_iso)
if not messages:
log.info("没有新消息,同步结束")
with open(LAST_SYNC_FILE, 'w') as f:
f.write(current_iso)
return
log.info(f"发现 {len(messages)} 条新消息")
# 初始化COS
cos_uploader = CosUploader()
date_str = datetime.now(TZ).strftime('%Y-%m-%d')
# 处理消息
rows = []
for msg in messages:
row = process_message(msg, cos_uploader, date_str)
if row:
rows.append(row)
# 写入表格
if rows:
append_to_sheet(rows)
log.info(f"同步完成: {len(rows)} 条记录")
else:
log.info("无有效消息需要写入")
# 更新同步时间
with open(LAST_SYNC_FILE, 'w') as f:
f.write(current_iso)
if __name__ == '__main__':
main()