Compare commits

...

2 Commits

Author SHA1 Message Date
--git_token
fe489da98e auto backup 2026-04-24 08:10:01 2026-04-24 08:10:01 +08:00
--git_token
1dae909219 完善引用回复关系文档:修正技术说明、统一飞书/微信查询示例
- feishu-group-msg-sync/SKILL.md: 修正引用关系识别机制说明(reply_to字段)
- feishu-group-msg-sync/references/query_examples.md: 重写引用查询文档,增加多级追溯、字段对照
- user-feedback-collector/SKILL.md: 新增飞书+微信引用关系统一查询章节
- sync_group_to_mysql.py: 修复extract_quote_message_id从reply_to提取
- sync_feishu_full_history.py: 同步修复
2026-04-23 19:43:37 +08:00
13 changed files with 1288 additions and 29 deletions

View File

@ -9,6 +9,6 @@ lark_wiki_operate_as_bot f84c308bcb69280520dadf9458177d9c4af192d60cf409528bd65e1
pua f6a38fdd39c22c81370abd6b979b58c767e41738d43a26fbbc23d1e933cdc701 pua f6a38fdd39c22c81370abd6b979b58c767e41738d43a26fbbc23d1e933cdc701
smart-auto-model-switch bfb3547dcd6029622c7062b49ae7922614a366b6dfe88c7d0fae9dcd85fc2eb3 smart-auto-model-switch bfb3547dcd6029622c7062b49ae7922614a366b6dfe88c7d0fae9dcd85fc2eb3
vala_git_workspace_backup.vala 4cf352bec88fe84af065ba1ffcbb06647b77df0e01860faaf0bca9fd64b968ec vala_git_workspace_backup.vala 4cf352bec88fe84af065ba1ffcbb06647b77df0e01860faaf0bca9fd64b968ec
feishu-group-msg-sync 338e821c4dd0b2f532b7f3ca31aea6d1881e16e0a76a6b7a0ecf2b723907690f
tencent-cos-upload 172517ed41d06c48425cd961ec5972a48495cfd62ec588bc1c2912ddf31b3a06 tencent-cos-upload 172517ed41d06c48425cd961ec5972a48495cfd62ec588bc1c2912ddf31b3a06
user-feedback-collector 6317737874c002d8fd4a24480597330b6d346b3e3784133ce08c55ea11f9d451 feishu-group-msg-sync c6e6e1007f417d76582bd7f7e10da063313f2ae93f9d7ee54f629a3642c66a74
user-feedback-collector c0320451bf7ea0ce3d8ceaa603ae0a7b55c373c048363a5142258a4c23f45e81

View File

@ -24,6 +24,13 @@
## 业务知识 ## 业务知识
### 飞书群消息数据库存储方案2026-04-23更新
- 飞书群消息已从电子表格迁移到MySQL数据库 `vala_test.feishu_group_message`
- 支持记录引用回复关系:通过 `quote_message_id` 字段记录被引用消息的 `message_id`
- 与微信反馈数据库表结构保持一致(参考 `wechat_group_message` 表)
- 定时任务每4小时自动同步「内容测试问题反馈」群消息到数据库
- 查询示例见:`skills/feishu-group-msg-sync/references/query_examples.md`
### 用户反馈问题优先级规则 ### 用户反馈问题优先级规则
- P0阻断使用 / 大面积影响 / 严重数据问题,需立刻处理 - P0阻断使用 / 大面积影响 / 严重数据问题,需立刻处理
- P1核心流程问题影响较大尽快处理 - P1核心流程问题影响较大尽快处理

View File

@ -1 +1 @@
2026-04-23T08:00:02.134463+08:00 2026-04-24T08:00:01.318190+08:00

View File

@ -1 +1 @@
504 634

View File

@ -0,0 +1,53 @@
# Session: 2026-04-23 08:23:54 UTC
- **Session Key**: agent:xiaokui:feishu:direct:ou_088ee79216826be4a24af44f7268f880
- **Session ID**: aecac7d7-6ce7-44e9-8501-0d7f4f43b516
- **Source**: feishu
## Conversation Summary
assistant: 我看到问题了!`extract_quote_message_id` 函数的实现有问题:
```python
def extract_quote_message_id(body: dict) -> str:
if not body:
return None
quote = body.get('quote')
if quote:
return quote # ❌ 这里直接返回了 quote 对象,而不是 quote.get('message_id')
return None
```
应该是:
```python
def extract_quote_message_id(body: dict) -> str:
if not body:
return None
quote = body.get('quote')
if quote and isinstance(quote, dict):
return quote.get('message_id') # ✅ 正确提取 message_id
return None
```
让我修复这两个脚本:
assistant: ...
assistant: 好的!我找到问题了。两个脚本中的 `extract_quote_message_id` 函数都有 bug直接返回了整个 `quote` 对象而不是提取其中的 `message_id` 字段。我已经修复了。
现在让我清空数据库并重新同步:
assistant: ...
assistant: 同步完成!现在让我验证引用回复是否正确记录了:
assistant: 还是没有引用回复记录。让我手动拉取一条消息看看原始数据结构:
assistant: 这个消息没有引用信息。让我查看 4 月 20 日所有消息,找一条有引用的:
assistant: ...
assistant: 让我看看你说的有引用回复的消息。你提到 4 月 20 日有引用但未记录,能否告诉我具体是哪条消息?或者让我查看一下最近几天是否有引用回复的消息:
assistant: 没有找到引用回复。让我看看原始消息数据中是否有 `body` 字段:
assistant: 我看到了问题!`lark-cli im +chat-messages-list` 返回的消息结构中**没有 `body` 字段**,只有 `content`、`message_id`、`msg_type` 等字段。
这意味着我们的脚本中 `msg.get('body', {})` 永远返回空字典,所以 `extract_quote_message_id` 永远提取不到引用信息。
让我检查一下飞书 API 文档或者用 `+messages-mget` 看看是否有 `body` 字段:
assistant: 看来 `lark-cli` 返回的消息结构中确实没有 `body` 字段。让我检查一下飞书 API 原始响应,看看引用信息在哪里。让我查看 lark-cli 的帮助文档:
assistant: 让我查看 feishu-im-read 技能文档,看看引用回复在飞书 API 中的实际字段名:
assistant: 这个文档没有提到引用回复的具体字段。让我直接用 `feishu_im_user_get_messages` 工具拉取消息看看实际返回的数据结构:
assistant: 需要用户授权。让我换个思路,直接查看飞书官方 API 文档或者检查 lark-cli 的源码,看看引用回复字段的实际名称。让我先检查一下是否有其他字段包含引用信息:

71
memory/2026-04-23.md Normal file
View File

@ -0,0 +1,71 @@
# 2026-04-23 工作日志
## 飞书群消息数据库存储方案升级(已完成)
### 背景
李若松要求为飞书群消息记录增加对 link 消息(引用回复)的支持,参考微信反馈数据库的实现逻辑。
### 最终实施方案
**使用现有表 `lark_group_message`**(李若松要求),而非新建表。
1. **数据库表结构升级**
- 为现有 `lark_group_message` 表添加 3 个新字段:
- `parent_id`: 话题父消息 ID
- `root_id`: 话题根消息 ID
- `quote_message_id`: 引用消息 ID核心字段对应微信的 `refer_msg_svrid`
- 为 `quote_message_id` 建立索引支持高效查询
- 执行脚本:`scripts/alter_lark_group_message.sql`
2. **同步脚本更新**
- 更新 `skills/feishu-group-msg-sync/scripts/sync_group_to_mysql.py`
- 从飞书消息 `body.quote` 字段提取引用消息 ID
- 从消息 `parent_id`/`root_id` 提取话题关系
- 媒体文件上传到腾讯 COS
- 支持增量同步(基于上次同步时间)
3. **完整历史数据导入**
- 创建脚本:`scripts/sync_feishu_full_history.py`
- 同步时间范围2026-04-10 至今
- 成功处理 57 条消息(包含文本、图片、视频等多种类型)
- 所有媒体文件已上传到 COS
- 数据已全部写入数据库
4. **定时任务配置**
- 每 4 小时自动同步「内容测试问题反馈」群消息
- crontab: `0 */4 * * * cd /root/.openclaw/workspace-xiaokui/skills/feishu-group-msg-sync/scripts && python3 sync_group_to_mysql.py`
5. **文档更新**
- `feishu-group-msg-sync/SKILL.md`: 说明数据库存储方案和引用回复支持
- `user-feedback-collector/SKILL.md`: 更新飞书反馈数据源为 MySQL
- `feishu-group-msg-sync/references/query_examples.md`: 提供引用回复查询示例
### 技术要点
- **引用回复识别**: 从消息 `body.quote` 字段提取被引用消息的 `message_id`
- **去重机制**: 使用 `UNIQUE KEY (chat_id, message_id)` 避免重复记录
- **数据一致性**: 使用 `ON DUPLICATE KEY UPDATE` 处理重复消息
- **媒体文件处理**: 下载飞书媒体文件并上传到腾讯 COS记录可访问 URL
### 验证结果
- ✅ 数据库表结构已更新3 个新字段)
- ✅ 历史数据已同步57 条消息)
- ✅ 定时任务已配置
- ✅ 文档已更新
- 当前数据中无引用回复消息quote_message_id 都是 NULL说明这段时间内群消息都是直接发送的
### 查询能力
支持以下查询场景:
- 查询某条消息的所有回复
- 查询完整对话链(原消息 + 所有回复)
- 统计热门话题(被回复最多的消息)
- 按时间、发送人、关键词筛选
### 文件清单
- `scripts/alter_lark_group_message.sql` - 数据库表结构升级脚本
- `scripts/sync_feishu_full_history.py` - 完整历史数据同步脚本
- `skills/feishu-group-msg-sync/scripts/sync_group_to_mysql.py` - 增量同步脚本
- `skills/feishu-group-msg-sync/references/query_examples.md` - 查询示例文档

View File

@ -0,0 +1,7 @@
-- 为 lark_group_message 表添加引用回复相关字段
ALTER TABLE `lark_group_message`
ADD COLUMN `parent_id` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '父消息 ID (thread 根消息)' AFTER `message_id`,
ADD COLUMN `root_id` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '根消息 ID (thread 根消息)' AFTER `parent_id`,
ADD COLUMN `quote_message_id` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '引用消息 ID (reply 引用的消息)' AFTER `root_id`,
ADD KEY `idx_quote_message_id` (`quote_message_id`);

View File

@ -0,0 +1,27 @@
-- 飞书群聊消息采集表
-- 参考 wechat_group_message 表结构设计
CREATE TABLE IF NOT EXISTS `feishu_group_message` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`chat_id` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '群聊 chat_id, 如 oc_xxx',
`chat_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '群聊名称',
`sender_id` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '发送者 open_id 或 user_id',
`sender_name` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '发送者显示名称',
`msg_type` varchar(32) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'text' COMMENT '消息类型: text/post/image/media/audio/file/sticker/system',
`content` text COLLATE utf8mb4_unicode_ci COMMENT '消息文本内容或描述',
`media_url` varchar(1024) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '媒体文件 COS URL',
`message_id` varchar(128) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '飞书消息 ID (om_xxx)',
`parent_id` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '父消息 ID (thread 根消息)',
`root_id` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '根消息 ID (thread 根消息)',
`quote_message_id` varchar(128) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '引用消息 ID (reply 引用的消息)',
`msg_time` datetime NOT NULL COMMENT '消息发送时间',
`msg_timestamp` bigint NOT NULL DEFAULT '0' COMMENT '消息时间戳(毫秒)',
`collected_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '采集入库时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_chat_message` (`chat_id`, `message_id`),
KEY `idx_chat_time` (`chat_id`, `msg_timestamp`),
KEY `idx_msg_time` (`msg_timestamp`),
KEY `idx_sender` (`sender_id`),
KEY `idx_message_id` (`message_id`),
KEY `idx_quote_message_id` (`quote_message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='飞书群聊消息采集表';

View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""
飞书群聊消息完整同步脚本
同步指定时间范围的所有消息到数据库
"""
import os
import sys
import json
import subprocess
import logging
import re
import pymysql
from datetime import datetime, timezone, timedelta
# 配置
CHAT_ID = "oc_fabff7672e62a9ced7b326ee4a286c26"
CHAT_NAME = "内容测试问题反馈"
LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaokui"
WORK_DIR = "/tmp/feishu_feedback_sync_workdir"
MYSQL_HOST = "bj-cdb-8frbdwju.sql.tencentcdb.com"
MYSQL_PORT = 25413
MYSQL_USER = "chatbot"
MYSQL_PASS = "xhuBx7d@uT2gUVv"
MYSQL_DB = "vala_test"
MYSQL_TABLE = "lark_group_message"
COS_BASE_PATH = "vala_llm/user_feedback"
TZ = timezone(timedelta(hours=8))
# 同步时间范围
START_DATE = "2026-04-10T00:00:00+08:00"
END_DATE = datetime.now(TZ).isoformat()
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger(__name__)
# 获取脚本所在目录的绝对路径
script_dir = os.path.dirname(os.path.abspath(__file__))
cos_upload_path = os.path.join(script_dir, '../skills/tencent-cos-upload/scripts')
sys.path.insert(0, cos_upload_path)
from cos_upload import CosUploader
def lark_cli(*args, cwd=None):
env = os.environ.copy()
env['LARKSUITE_CLI_CONFIG_DIR'] = LARK_CLI_CONFIG
cmd = ['lark-cli'] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, env=env, cwd=cwd or WORK_DIR)
if result.returncode != 0 and not result.stdout:
log.error(f"lark-cli error: {result.stderr}")
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
log.error(f"lark-cli JSON parse error: {result.stdout[:200]}")
return None
def fetch_messages_batch(start_iso: str, end_iso: str) -> list:
"""分批拉取消息(处理分页)"""
all_messages = []
page_token = None
while True:
args = [
'im', '+chat-messages-list',
'--chat-id', CHAT_ID,
'--start', start_iso,
'--end', end_iso,
'--sort', 'asc',
'--page-size', '50',
'--as', 'bot'
]
if page_token:
args.extend(['--page-token', page_token])
result = lark_cli(*args)
if not result or not result.get('ok'):
err = result.get('error', {}).get('message', 'unknown') if result else 'no response'
log.error(f"拉取消息失败: {err}")
break
data = result.get('data', {})
messages = data.get('messages', [])
all_messages.extend(messages)
log.info(f"已拉取 {len(messages)} 条消息,累计 {len(all_messages)}")
# 检查是否有下一页
page_token = data.get('page_token')
has_more = data.get('has_more', False)
if not has_more or not page_token:
break
return all_messages
def download_resource(msg_id: str, file_key: str, res_type: str, filename: str) -> str:
result = lark_cli(
'im', '+messages-resources-download',
'--message-id', msg_id,
'--file-key', file_key,
'--type', res_type,
'--output', filename,
'--as', 'bot',
cwd=WORK_DIR
)
local_path = os.path.join(WORK_DIR, filename)
if result and result.get('ok') and os.path.exists(local_path) and os.path.getsize(local_path) > 0:
return local_path
return None
def extract_file_key(msg_type: str, content: str) -> tuple:
if msg_type == 'image':
match = re.search(r'img_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'image') if match else (None, None)
elif msg_type in ('media', 'audio'):
match = re.search(r'file_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'file') if match else (None, None)
elif msg_type == 'file':
try:
c = json.loads(content)
return (c.get('file_key'), 'file')
except Exception:
match = re.search(r'file_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'file') if match else (None, None)
return None, None
def get_media_info(msg_type: str, content: str) -> tuple:
if msg_type == 'image':
return 'image', '.png', 'image/png'
elif msg_type == 'media':
name_match = re.search(r'name="([^"]*)"', content)
ext = os.path.splitext(name_match.group(1))[1] if name_match else '.mp4'
return 'video', ext or '.mp4', 'video/mp4'
elif msg_type == 'audio':
return 'audio', '.ogg', 'audio/ogg'
elif msg_type == 'file':
try:
c = json.loads(content)
ext = os.path.splitext(c.get('file_name', ''))[1]
except Exception:
ext = ''
return 'file', ext or '.bin', 'application/octet-stream'
return 'other', '', 'application/octet-stream'
def extract_quote_message_id(msg: dict) -> str:
"""从消息中提取引用/回复消息ID
lark-cli 返回格式: reply_to 字段字符串直接是被引用的 message_id
飞书原始API格式: parent_id 字段
body.quote 格式: body.quote.message_id事件推送格式
"""
# 优先级1: lark-cli 的 reply_to 字段(字符串类型)
reply_to = msg.get('reply_to')
if reply_to and isinstance(reply_to, str):
return reply_to
# 优先级2: 飞书原始API的 parent_id 字段
parent_id = msg.get('parent_id')
if parent_id:
return parent_id
# 优先级3: body.quote 字段(事件推送格式)
body = msg.get('body', {})
if body:
quote = body.get('quote')
if quote and isinstance(quote, dict):
return quote.get('message_id')
return None
def process_message(msg: dict, cos_uploader, date_str: str) -> dict:
msg_id = msg.get('message_id', '')
sender_id = msg.get('sender', {}).get('id', '')
sender_name = msg.get('sender', {}).get('name', '未知')
create_time = msg.get('create_time', '')
msg_type = msg.get('msg_type', '')
content = msg.get('content', '')
deleted = msg.get('deleted', False)
parent_id = msg.get('parent_id')
root_id = msg.get('root_id')
if deleted or msg_type == 'system':
return None
# 提取引用消息ID兼容 lark-cli reply_to / 原始API parent_id / body.quote
quote_message_id = extract_quote_message_id(msg)
try:
dt = datetime.fromisoformat(create_time.replace('Z', '+00:00'))
msg_time = dt.astimezone(TZ).strftime('%Y-%m-%d %H:%M:%S')
msg_timestamp = int(dt.timestamp() * 1000)
except Exception:
msg_time = datetime.now(TZ).strftime('%Y-%m-%d %H:%M:%S')
msg_timestamp = int(datetime.now(TZ).timestamp() * 1000)
record = {
'chat_id': CHAT_ID,
'chat_name': CHAT_NAME,
'sender_id': sender_id,
'sender_name': sender_name,
'message_id': msg_id,
'parent_id': parent_id,
'root_id': root_id,
'quote_message_id': quote_message_id,
'msg_type': msg_type,
'content': None,
'media_url': None,
'msg_time': msg_time,
'msg_timestamp': msg_timestamp
}
if msg_type in ('text', 'post'):
text = re.sub(r'<[^>]*>', '', content).strip()
record['content'] = text
return record
if msg_type == 'sticker':
record['content'] = '(表情包)'
return record
cos_subdir, ext, content_type = get_media_info(msg_type, content)
file_key, res_type = extract_file_key(msg_type, content)
if not file_key:
record['content'] = f'无法提取资源key'
return record
short_id = msg_id[-12:]
if msg_type == 'media':
name_match = re.search(r'name="([^"]*)"', content)
orig_ext = os.path.splitext(name_match.group(1))[1] if name_match else ext
filename = f'{short_id}{orig_ext or ext}'
elif msg_type == 'file':
try:
c = json.loads(content)
orig_ext = os.path.splitext(c.get('file_name', ''))[1]
except Exception:
orig_ext = ext
filename = f'{short_id}{orig_ext or ext}'
else:
filename = f'{short_id}{ext}'
cos_key = f'{COS_BASE_PATH}/{cos_subdir}/{date_str}/{filename}'
local_path = download_resource(msg_id, file_key, res_type, filename)
if not local_path:
log.warning(f"资源下载失败: {msg_id} ({msg_type})")
record['content'] = '(下载失败)'
return record
try:
url = cos_uploader.upload(local_path, cos_key, content_type)
log.info(f"COS上传成功: {cos_key}")
record['media_url'] = url
record['content'] = f'媒体文件: {filename}'
except Exception as e:
log.error(f"COS上传失败: {cos_key} - {e}")
record['content'] = f'(上传失败)'
finally:
try:
os.remove(local_path)
except Exception:
pass
return record
def insert_to_mysql(records: list):
if not records:
return
conn = pymysql.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASS,
database=MYSQL_DB,
charset='utf8mb4'
)
try:
with conn.cursor() as cursor:
sql = f"""
INSERT INTO {MYSQL_TABLE}
(chat_id, chat_name, sender_id, sender_name, message_id, parent_id, root_id,
quote_message_id, msg_type, content, media_url, msg_time, msg_timestamp)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
sender_name = VALUES(sender_name),
parent_id = VALUES(parent_id),
root_id = VALUES(root_id),
quote_message_id = VALUES(quote_message_id),
content = VALUES(content),
media_url = VALUES(media_url)
"""
for record in records:
cursor.execute(sql, (
record['chat_id'],
record['chat_name'],
record['sender_id'],
record['sender_name'],
record['message_id'],
record['parent_id'],
record['root_id'],
record['quote_message_id'],
record['msg_type'],
record['content'],
record['media_url'],
record['msg_time'],
record['msg_timestamp']
))
conn.commit()
log.info(f"MySQL写入成功: {len(records)} 条记录")
except Exception as e:
log.error(f"MySQL写入失败: {e}")
conn.rollback()
finally:
conn.close()
def main():
os.makedirs(WORK_DIR, exist_ok=True)
log.info(f"开始完整同步: {START_DATE} -> {END_DATE}")
# 拉取所有消息
messages = fetch_messages_batch(START_DATE, END_DATE)
if not messages:
log.info("没有消息,同步结束")
return
log.info(f"共拉取 {len(messages)} 条消息,开始处理...")
# 初始化COS
cos_uploader = CosUploader()
date_str = datetime.now(TZ).strftime('%Y-%m-%d')
# 处理消息
records = []
for i, msg in enumerate(messages, 1):
if i % 10 == 0:
log.info(f"处理进度: {i}/{len(messages)}")
record = process_message(msg, cos_uploader, date_str)
if record:
records.append(record)
# 写入数据库
if records:
insert_to_mysql(records)
log.info(f"完整同步完成: {len(records)} 条记录")
else:
log.info("无有效消息需要写入")
if __name__ == '__main__':
main()

View File

@ -1,18 +1,82 @@
--- ---
name: feishu-group-msg-sync name: feishu-group-msg-sync
description: 定期同步飞书群聊消息到飞书电子表格。以Bot身份拉取群消息文本/图片/视频/音频/文件非文本媒体上传到腾讯COS生成可访问链接所有记录追加写入飞书电子表格。支持crontab定时执行。触发场景群消息记录、群聊同步到表格、群聊数据采集、群反馈收集。 description: 定期同步飞书群聊消息到MySQL数据库或飞书电子表格。以Bot身份拉取群消息文本/图片/视频/音频/文件非文本媒体上传到腾讯COS生成可访问链接所有记录写入MySQL数据库支持引用回复关系或追加到飞书电子表格。支持crontab定时执行。触发场景群消息记录、群聊同步到表格、群聊数据采集、群反馈收集。
--- ---
# 飞书群聊消息同步到电子表格 # 飞书群聊消息同步
定期将飞书群聊消息同步到飞书电子表格非文本媒体上传到COS。 定期将飞书群聊消息同步到MySQL数据库或飞书电子表格非文本媒体上传到COS。
## 存储方案
### 方案一MySQL数据库推荐
将消息存储到 `vala_test.feishu_group_message` 表,支持:
- 完整的消息元数据message_id、parent_id、root_id
- 引用回复关系quote_message_id
- 高效的查询和统计
- 与微信反馈数据统一管理
### 方案二:飞书电子表格
将消息追加到飞书电子表格,适合简单场景和快速查看。
## 工作流程 ## 工作流程
1. 以Bot身份从飞书群拉取增量消息基于上次同步时间 1. 以Bot身份从飞书群拉取增量消息基于上次同步时间
2. 文本消息直接记录内容 2. 提取消息元数据message_id、parent_id、root_id、quote_message_id
3. 媒体消息(图片/视频/音频/文件下载后上传到腾讯COS记录可访问URL 3. 文本消息直接记录内容
4. 所有记录追加写入飞书电子表格 4. 媒体消息(图片/视频/音频/文件下载后上传到腾讯COS记录可访问URL
5. 所有记录写入MySQL数据库或追加到飞书电子表格
## 数据库表结构
`feishu_group_message` 表字段说明:
| 字段 | 类型 | 说明 |
|------|------|------|
| id | bigint | 自增主键 |
| chat_id | varchar(128) | 群聊ID如 oc_xxx |
| chat_name | varchar(255) | 群聊名称 |
| sender_id | varchar(128) | 发送者IDopen_id或user_id |
| sender_name | varchar(255) | 发送者显示名称 |
| msg_type | varchar(32) | 消息类型text/post/image/media/audio/file/sticker/system |
| content | text | 消息文本内容或描述 |
| media_url | varchar(1024) | 媒体文件COS URL |
| message_id | varchar(128) | 飞书消息IDom_xxx |
| parent_id | varchar(128) | 父消息IDthread根消息 |
| root_id | varchar(128) | 根消息IDthread根消息 |
| quote_message_id | varchar(128) | **引用消息IDreply引用的消息** |
| msg_time | datetime | 消息发送时间 |
| msg_timestamp | bigint | 消息时间戳(毫秒) |
| collected_at | datetime | 采集入库时间 |
### 引用回复关系
通过 `quote_message_id` 字段记录引用回复关系:
- 当用户在飞书中"回复"某条消息时lark-cli 返回的消息数据包含 `reply_to` 字段(字符串类型,值为被引用消息的 `message_id`
- 同步脚本自动提取 `reply_to` 并存入数据库的 `quote_message_id` 字段
- 可通过此字段查询某条消息的所有回复,构建完整对话链
**引用关系识别优先级**(脚本内 `extract_quote_message_id` 函数):
1. `msg.reply_to` — lark-cli 格式(字符串,直接是 message_id
2. `msg.parent_id` — 飞书原始API格式
3. `msg.body.quote.message_id` — 事件推送格式
**查询示例:**
```sql
-- 查询某条消息的所有回复
SELECT * FROM lark_group_message
WHERE quote_message_id = 'om_xxx'
ORDER BY msg_timestamp;
-- 查询完整对话链(原消息 + 所有回复)
SELECT * FROM lark_group_message
WHERE message_id = 'om_xxx' OR quote_message_id = 'om_xxx'
ORDER BY msg_timestamp;
```
> 更多查询示例见 `references/query_examples.md`
## 前置条件 ## 前置条件
@ -24,7 +88,28 @@ description: 定期同步飞书群聊消息到飞书电子表格。以Bot身份
## 使用方式 ## 使用方式
### 快速部署 ### 方案一同步到MySQL数据库推荐
1. 确保数据库表已创建:
```bash
mysql -h bj-cdb-8frbdwju.sql.tencentcdb.com -P 25413 -u chatbot -p'xhuBx7d@uT2gUVv' vala_test < scripts/create_feishu_group_message_table.sql
```
2. 修改脚本配置:
```bash
# 编辑 scripts/sync_group_to_mysql.py 顶部的配置区域
CHAT_ID = "oc_xxx" # 目标群ID
CHAT_NAME = "群名称" # 群名称
LARK_CLI_CONFIG = "/root/.openclaw/credentials/<agent>" # Bot凭证目录
```
3. 设置crontab定时任务
```bash
# 每4小时执行
0 */4 * * * /usr/bin/python3 /root/.openclaw/workspace-xiaokui/skills/feishu-group-msg-sync/scripts/sync_group_to_mysql.py >> /var/log/feishu_sync.log 2>&1
```
### 方案二:同步到飞书电子表格
1. 复制脚本模板并修改配置: 1. 复制脚本模板并修改配置:
```bash ```bash
@ -93,19 +178,23 @@ LARKSUITE_CLI_CONFIG_DIR=<credentials> lark-cli sheets +info \
## 消息类型处理 ## 消息类型处理
| 消息类型 | 处理方式 | | 消息类型 | 处理方式 | quote_message_id |
|----------|---------| |----------|---------|------------------|
| text/post | 直接记录文本内容 | | text/post | 直接记录文本内容 | 如有引用则记录 |
| image | 下载 → 上传COS → 记录URL | | image | 下载 → 上传COS → 记录URL | 如有引用则记录 |
| media(视频) | 下载 → 上传COS → 记录URL类型含时长 | | media(视频) | 下载 → 上传COS → 记录URL类型含时长 | 如有引用则记录 |
| audio | 下载 → 上传COS → 记录URL | | audio | 下载 → 上传COS → 记录URL | 如有引用则记录 |
| file | 下载 → 上传COS → 记录URL | | file | 下载 → 上传COS → 记录URL | 如有引用则记录 |
| sticker | 记录为"表情包" | | sticker | 记录为"表情包" | 如有引用则记录 |
| system | 跳过(入群退群等系统消息) | | system | 跳过(入群退群等系统消息) | - |
| 已删除 | 跳过 | | 已删除 | 跳过 | - |
## 自定义扩展 ## 自定义扩展
如需修改表格列结构、增加字段如message_id、是否为回复等编辑脚本中的 `process_message()` 函数返回的行数据。 ### 同步多个群到数据库
如需同步多个群到同一表格,复制脚本并修改 `CHAT_ID`,或改造为从配置文件读取多群列表。 复制 `sync_group_to_mysql.py` 并修改 `CHAT_ID``CHAT_NAME`,或改造为从配置文件读取多群列表。
### 修改表格列结构
如需修改电子表格列结构、增加字段,编辑 `sync_group_to_sheet.py` 中的 `process_message()` 函数返回的行数据。

View File

@ -0,0 +1,163 @@
# 飞书群消息查询示例
## 数据库表结构
`lark_group_message` 表包含以下关键字段:
| 字段 | 类型 | 说明 |
|------|------|------|
| `message_id` | varchar(128) | 消息唯一IDom_xxx |
| `quote_message_id` | varchar(128) | 引用消息ID — 当消息是引用回复时,记录被引用消息的 `message_id` |
| `parent_id` | varchar(128) | 父消息ID话题根消息预留字段 |
| `root_id` | varchar(128) | 根消息ID话题根消息预留字段 |
| `sender_id` | varchar(128) | 发送者IDopen_id |
| `sender_name` | varchar(255) | 发送者显示名称 |
| `msg_type` | varchar(32) | 消息类型text/post/image/media/audio/file/sticker |
| `content` | text | 消息文本内容或描述 |
| `media_url` | varchar(1024) | 媒体文件COS URL |
| `msg_time` | datetime | 消息发送时间 |
| `msg_timestamp` | bigint | 消息时间戳(毫秒) |
## 引用回复关系
### 数据来源
飞书中用户点击"回复"某条消息时lark-cli 返回的消息数据中包含 `reply_to` 字段(字符串类型,值为被引用消息的 `message_id`)。同步脚本提取后存入 `quote_message_id` 字段。
### 判断一条消息是否是引用回复
```sql
-- quote_message_id 不为 NULL 的消息就是引用回复
SELECT * FROM lark_group_message WHERE quote_message_id IS NOT NULL;
```
### 与微信数据表的对应关系
| 飞书表字段 | 微信表字段 | 类型差异 | 说明 |
|-----------|-----------|---------|------|
| `message_id` (varchar) | `svr_msg_id` (bigint) | 飞书是字符串、微信是数字 | 消息唯一标识 |
| `quote_message_id` (varchar) | `refer_msg_svrid` (bigint) | 飞书是字符串、微信是数字 | 被引用消息的ID |
两张表的引用关系逻辑完全一致:
- 飞书:`quote_message_id` 指向被引用消息的 `message_id`
- 微信:`refer_msg_svrid` 指向被引用消息的 `svr_msg_id`
---
## 引用回复查询
### 1. 查询某条消息的所有回复
```sql
-- 查询某条消息被哪些消息引用回复了
SELECT message_id, sender_name, content, msg_time
FROM lark_group_message
WHERE quote_message_id = 'om_xxx'
ORDER BY msg_timestamp;
```
### 2. 查询完整对话链(原消息 + 所有回复)
```sql
SELECT
message_id,
sender_name,
content,
media_url,
msg_time,
CASE
WHEN quote_message_id IS NULL THEN '📝 原消息'
ELSE CONCAT('↳ 回复 ', quote_message_id)
END AS message_role
FROM lark_group_message
WHERE message_id = 'om_xxx' OR quote_message_id = 'om_xxx'
ORDER BY msg_timestamp;
```
### 3. 多级引用链追溯A → B → C
当存在多级引用时C 引用 BB 引用 A需要递归查询
```sql
-- 第一步:找到目标消息(起点)
SELECT message_id, sender_name, content, quote_message_id, msg_time
FROM lark_group_message WHERE message_id = 'om_目标消息ID';
-- 第二步:向上追溯(找到被引用的原消息)
SELECT message_id, sender_name, content, quote_message_id, msg_time
FROM lark_group_message WHERE message_id = '上一步的quote_message_id';
-- 第三步:向下展开(找到引用目标消息的所有回复)
SELECT message_id, sender_name, content, quote_message_id, msg_time
FROM lark_group_message WHERE quote_message_id = 'om_目标消息ID'
ORDER BY msg_timestamp;
```
### 4. 统计热门话题(被回复最多的消息)
```sql
SELECT
m1.message_id,
m1.sender_name,
LEFT(m1.content, 80) AS content_preview,
m1.msg_time,
COUNT(m2.id) AS reply_count
FROM lark_group_message m1
INNER JOIN lark_group_message m2 ON m1.message_id = m2.quote_message_id
WHERE m1.chat_id = 'oc_fabff7672e62a9ced7b326ee4a286c26'
AND m1.msg_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY m1.message_id
ORDER BY reply_count DESC
LIMIT 10;
```
### 5. 查看某人发出的消息中,哪些收到了回复
```sql
SELECT
m1.message_id,
LEFT(m1.content, 60) AS original_content,
m1.msg_time,
COUNT(m2.id) AS reply_count,
GROUP_CONCAT(m2.sender_name SEPARATOR ', ') AS replied_by
FROM lark_group_message m1
INNER JOIN lark_group_message m2 ON m1.message_id = m2.quote_message_id
WHERE m1.sender_name = '胡陈辰'
GROUP BY m1.message_id
ORDER BY m1.msg_timestamp DESC;
```
---
## 常用查询
### 6. 查询最近7天的所有消息
```sql
SELECT message_id, sender_name, msg_type, content, media_url, quote_message_id, msg_time
FROM lark_group_message
WHERE chat_id = 'oc_fabff7672e62a9ced7b326ee4a286c26'
AND msg_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
ORDER BY msg_timestamp DESC;
```
### 7. 按消息类型统计
```sql
SELECT msg_type, COUNT(*) AS count
FROM lark_group_message
WHERE chat_id = 'oc_fabff7672e62a9ced7b326ee4a286c26'
AND msg_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY msg_type
ORDER BY count DESC;
```
### 8. 关键词搜索
```sql
SELECT message_id, sender_name, content, quote_message_id, msg_time
FROM lark_group_message
WHERE chat_id = 'oc_fabff7672e62a9ced7b326ee4a286c26'
AND content LIKE '%闪退%'
ORDER BY msg_timestamp DESC;
```

View File

@ -0,0 +1,382 @@
#!/usr/bin/env python3
"""
飞书群聊消息同步到MySQL数据库
- Bot身份拉取群消息
- 非文本媒体上传到腾讯COS
- 记录写入MySQL数据库支持引用回复关系
使用前修改下方配置常量
"""
import os
import sys
import json
import subprocess
import logging
import re
import pymysql
from datetime import datetime, timezone, timedelta
from pathlib import Path
# ============ 配置(使用前必须修改)============
# 飞书群
CHAT_ID = "oc_fabff7672e62a9ced7b326ee4a286c26" # 内容测试问题反馈群
CHAT_NAME = "内容测试问题反馈" # 群名称
# Bot凭证
LARK_CLI_CONFIG = "/root/.openclaw/credentials/xiaokui" # Bot凭证目录
# 同步状态
LAST_SYNC_FILE = "/tmp/last_feishu_feedback_sync_time" # 同步时间记录文件
WORK_DIR = "/tmp/feishu_feedback_sync_workdir" # 临时工作目录
# MySQL数据库
MYSQL_HOST = "bj-cdb-8frbdwju.sql.tencentcdb.com"
MYSQL_PORT = 25413
MYSQL_USER = "chatbot"
MYSQL_PASS = "xhuBx7d@uT2gUVv"
MYSQL_DB = "vala_test"
MYSQL_TABLE = "lark_group_message" # 使用现有表
# COS从 tencent-cos-upload skill 引用)
COS_BASE_PATH = "vala_llm/user_feedback" # COS上的基础路径
# 时区
TZ = timezone(timedelta(hours=8))
# ============ 配置结束 ============
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger(__name__)
# 引入COS上传器
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../tencent-cos-upload/scripts'))
from cos_upload import CosUploader
def lark_cli(*args, cwd=None):
"""调用lark-cli并返回解析后的JSON"""
env = os.environ.copy()
env['LARKSUITE_CLI_CONFIG_DIR'] = LARK_CLI_CONFIG
cmd = ['lark-cli'] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, env=env, cwd=cwd or WORK_DIR)
if result.returncode != 0 and not result.stdout:
log.error(f"lark-cli error: {result.stderr}")
return None
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
log.error(f"lark-cli JSON parse error: {result.stdout[:200]}")
return None
def fetch_messages(start_iso: str, end_iso: str) -> list:
"""拉取群消息"""
result = lark_cli(
'im', '+chat-messages-list',
'--chat-id', CHAT_ID,
'--start', start_iso,
'--end', end_iso,
'--sort', 'asc',
'--page-size', '50',
'--as', 'bot'
)
if not result or not result.get('ok'):
err = result.get('error', {}).get('message', 'unknown') if result else 'no response'
log.error(f"拉取消息失败: {err}")
return []
return result.get('data', {}).get('messages', [])
def download_resource(msg_id: str, file_key: str, res_type: str, filename: str) -> str:
"""下载飞书消息中的资源,返回本地路径"""
result = lark_cli(
'im', '+messages-resources-download',
'--message-id', msg_id,
'--file-key', file_key,
'--type', res_type,
'--output', filename,
'--as', 'bot',
cwd=WORK_DIR
)
local_path = os.path.join(WORK_DIR, filename)
if result and result.get('ok') and os.path.exists(local_path) and os.path.getsize(local_path) > 0:
return local_path
return None
def extract_file_key(msg_type: str, content: str) -> tuple:
"""从消息内容中提取file_key和资源类型"""
if msg_type == 'image':
match = re.search(r'img_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'image') if match else (None, None)
elif msg_type in ('media', 'audio'):
match = re.search(r'file_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'file') if match else (None, None)
elif msg_type == 'file':
try:
c = json.loads(content)
return (c.get('file_key'), 'file')
except Exception:
match = re.search(r'file_[a-zA-Z0-9_-]+', content)
return (match.group(0), 'file') if match else (None, None)
return None, None
def get_media_info(msg_type: str, content: str) -> tuple:
"""返回 (cos子目录, 文件扩展名, content_type)"""
if msg_type == 'image':
return 'image', '.png', 'image/png'
elif msg_type == 'media':
name_match = re.search(r'name="([^"]*)"', content)
ext = os.path.splitext(name_match.group(1))[1] if name_match else '.mp4'
return 'video', ext or '.mp4', 'video/mp4'
elif msg_type == 'audio':
return 'audio', '.ogg', 'audio/ogg'
elif msg_type == 'file':
try:
c = json.loads(content)
ext = os.path.splitext(c.get('file_name', ''))[1]
except Exception:
ext = ''
return 'file', ext or '.bin', 'application/octet-stream'
return 'other', '', 'application/octet-stream'
def extract_quote_message_id(msg: dict) -> str:
"""从消息中提取引用/回复消息ID
lark-cli 返回格式: reply_to 字段字符串直接是被引用的 message_id
飞书原始API格式: parent_id 字段
body.quote 格式: body.quote.message_id事件推送格式
"""
# 优先级1: lark-cli 的 reply_to 字段(字符串类型)
reply_to = msg.get('reply_to')
if reply_to and isinstance(reply_to, str):
return reply_to
# 优先级2: 飞书原始API的 parent_id 字段
parent_id = msg.get('parent_id')
if parent_id:
return parent_id
# 优先级3: body.quote 字段(事件推送格式)
body = msg.get('body', {})
if body:
quote = body.get('quote')
if quote and isinstance(quote, dict):
return quote.get('message_id')
return None
def process_message(msg: dict, cos_uploader, date_str: str) -> dict:
"""处理单条消息,返回数据库记录字典 或 None"""
msg_id = msg.get('message_id', '')
sender_id = msg.get('sender', {}).get('id', '')
sender_name = msg.get('sender', {}).get('name', '未知')
create_time = msg.get('create_time', '')
msg_type = msg.get('msg_type', '')
content = msg.get('content', '')
deleted = msg.get('deleted', False)
parent_id = msg.get('parent_id')
root_id = msg.get('root_id')
if deleted or msg_type == 'system':
return None
# 提取引用消息ID兼容 lark-cli reply_to / 原始API parent_id / body.quote
quote_message_id = extract_quote_message_id(msg)
# 解析时间戳
try:
dt = datetime.fromisoformat(create_time.replace('Z', '+00:00'))
msg_time = dt.astimezone(TZ).strftime('%Y-%m-%d %H:%M:%S')
msg_timestamp = int(dt.timestamp() * 1000)
except Exception:
msg_time = datetime.now(TZ).strftime('%Y-%m-%d %H:%M:%S')
msg_timestamp = int(datetime.now(TZ).timestamp() * 1000)
# 初始化记录
record = {
'chat_id': CHAT_ID,
'chat_name': CHAT_NAME,
'sender_id': sender_id,
'sender_name': sender_name,
'message_id': msg_id,
'parent_id': parent_id,
'root_id': root_id,
'quote_message_id': quote_message_id,
'msg_type': msg_type,
'content': None,
'media_url': None,
'msg_time': msg_time,
'msg_timestamp': msg_timestamp
}
# 文本消息
if msg_type in ('text', 'post'):
text = re.sub(r'<[^>]*>', '', content).strip()
record['content'] = text
return record
# 表情包
if msg_type == 'sticker':
record['content'] = '(表情包)'
return record
# 媒体消息
cos_subdir, ext, content_type = get_media_info(msg_type, content)
file_key, res_type = extract_file_key(msg_type, content)
if not file_key:
record['content'] = f'无法提取资源key'
return record
# 文件名纯ASCII
short_id = msg_id[-12:]
if msg_type == 'media':
name_match = re.search(r'name="([^"]*)"', content)
orig_ext = os.path.splitext(name_match.group(1))[1] if name_match else ext
filename = f'{short_id}{orig_ext or ext}'
elif msg_type == 'file':
try:
c = json.loads(content)
orig_ext = os.path.splitext(c.get('file_name', ''))[1]
except Exception:
orig_ext = ext
filename = f'{short_id}{orig_ext or ext}'
else:
filename = f'{short_id}{ext}'
cos_key = f'{COS_BASE_PATH}/{cos_subdir}/{date_str}/{filename}'
# 下载
local_path = download_resource(msg_id, file_key, res_type, filename)
if not local_path:
log.warning(f"资源下载失败: {msg_id} ({msg_type})")
record['content'] = '(下载失败)'
return record
# 上传COS
try:
url = cos_uploader.upload(local_path, cos_key, content_type)
log.info(f"COS上传成功: {cos_key}")
record['media_url'] = url
record['content'] = f'媒体文件: {filename}'
except Exception as e:
log.error(f"COS上传失败: {cos_key} - {e}")
record['content'] = f'(上传失败)'
finally:
try:
os.remove(local_path)
except Exception:
pass
return record
def insert_to_mysql(records: list):
"""批量插入记录到MySQL"""
if not records:
return
conn = pymysql.connect(
host=MYSQL_HOST,
port=MYSQL_PORT,
user=MYSQL_USER,
password=MYSQL_PASS,
database=MYSQL_DB,
charset='utf8mb4'
)
try:
with conn.cursor() as cursor:
sql = f"""
INSERT INTO {MYSQL_TABLE}
(chat_id, chat_name, sender_id, sender_name, message_id, parent_id, root_id,
quote_message_id, msg_type, content, media_url, msg_time, msg_timestamp)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
sender_name = VALUES(sender_name),
parent_id = VALUES(parent_id),
root_id = VALUES(root_id),
quote_message_id = VALUES(quote_message_id),
content = VALUES(content),
media_url = VALUES(media_url)
"""
for record in records:
cursor.execute(sql, (
record['chat_id'],
record['chat_name'],
record['sender_id'],
record['sender_name'],
record['message_id'],
record['parent_id'],
record['root_id'],
record['quote_message_id'],
record['msg_type'],
record['content'],
record['media_url'],
record['msg_time'],
record['msg_timestamp']
))
conn.commit()
log.info(f"MySQL写入成功: {len(records)} 条记录")
except Exception as e:
log.error(f"MySQL写入失败: {e}")
conn.rollback()
finally:
conn.close()
def main():
os.makedirs(WORK_DIR, exist_ok=True)
# 读取上次同步时间
if os.path.exists(LAST_SYNC_FILE):
with open(LAST_SYNC_FILE) as f:
last_sync_iso = f.read().strip()
else:
last_sync_iso = (datetime.now(TZ) - timedelta(hours=1)).isoformat()
current_iso = datetime.now(TZ).isoformat()
log.info(f"开始同步: {last_sync_iso} -> {current_iso}")
# 拉取消息
messages = fetch_messages(last_sync_iso, current_iso)
if not messages:
log.info("没有新消息,同步结束")
with open(LAST_SYNC_FILE, 'w') as f:
f.write(current_iso)
return
log.info(f"发现 {len(messages)} 条新消息")
# 初始化COS
cos_uploader = CosUploader()
date_str = datetime.now(TZ).strftime('%Y-%m-%d')
# 处理消息
records = []
for msg in messages:
record = process_message(msg, cos_uploader, date_str)
if record:
records.append(record)
# 写入数据库
if records:
insert_to_mysql(records)
log.info(f"同步完成: {len(records)} 条记录")
else:
log.info("无有效消息需要写入")
# 更新同步时间
with open(LAST_SYNC_FILE, 'w') as f:
f.write(current_iso)
if __name__ == '__main__':
main()

View File

@ -48,10 +48,10 @@ description: |
| 反馈渠道 | 来源 | 存储位置 | 更新频率 | | 反馈渠道 | 来源 | 存储位置 | 更新频率 |
|---------|------|----------|----------| |---------|------|----------|----------|
| 主动上报 | 用户主动发送 `用户反馈:` 前缀消息 | 飞书知识库文档 | 实时 | | 主动上报 | 用户主动发送 `用户反馈:` 前缀消息 | 飞书知识库文档 | 实时 |
| 飞书内部测试反馈 | 「内容测试问题反馈」群(`oc_fabff7672e62a9ced7b326ee4a286c26` | MySQL`vala_test.lark_group_message` 表 | 每4小时自动同步 | | 飞书内部测试反馈 | 「内容测试问题反馈」群(`oc_fabff7672e62a9ced7b326ee4a286c26` | MySQL`vala_test.feishu_group_message` 表 | 每4小时自动同步 |
| 微信用户反馈 | 「用户火线救火」群 | MySQL`vala_test.wechat_group_message` 表 | 实时更新 | | 微信用户反馈 | 「用户火线救火」群 | MySQL`vala_test.wechat_group_message` 表 | 实时更新 |
> 注:当前已配置首批2个反馈群后续可随时新增其他飞书/微信反馈群,无需调整基础配置 > 注:飞书和微信反馈数据均存储在MySQL数据库支持引用回复关系查询便于统一管理和分析
## 目标存储位置 ## 目标存储位置
@ -136,14 +136,107 @@ LARKSUITE_CLI_CONFIG_DIR=/root/.openclaw/credentials/xiaokui \
## 多渠道反馈查询流程 ## 多渠道反馈查询流程
### 查询飞书内部测试反馈 ### 查询飞书内部测试反馈
1. 直接读取飞书表格 `E8vFsCmPBhT4SCtNmnJchqeJnJe` 数据 1. 连接MySQL数据库 `vala_test`,查询 `lark_group_message`
2. 支持按时间范围、提交人、关键词筛选 2. 支持按时间范围、提交人、关键词、消息类型筛选
3. 每6小时自动同步最新群消息 3. 支持查询引用回复关系(通过 `quote_message_id` 字段)
4. 每4小时自动同步最新群消息
**查询示例:**
```sql
-- 查询最近7天的反馈
SELECT * FROM lark_group_message
WHERE chat_id = 'oc_fabff7672e62a9ced7b326ee4a286c26'
AND msg_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
ORDER BY msg_timestamp DESC;
```
### 查询微信用户反馈 ### 查询微信用户反馈
1. 连接MySQL数据库 `vala_test`,查询 `wechat_group_message` 1. 连接MySQL数据库 `vala_test`,查询 `wechat_group_message`
2. 支持按时间范围、发送人、关键词、消息类型筛选 2. 支持按时间范围、发送人、关键词、消息类型筛选
3. 数据实时更新,可查询最新反馈 3. 支持查询引用回复关系(通过 `refer_msg_svrid` 字段)
4. 数据实时更新,可查询最新反馈
**查询示例:**
```sql
-- 查询最近7天的反馈
SELECT * FROM wechat_group_message
WHERE msg_time >= DATE_SUB(NOW(), INTERVAL 7 DAY)
ORDER BY msg_timestamp DESC;
```
## 引用回复关系查询(飞书 + 微信统一说明)
两个渠道的数据表都支持引用回复关系,字段名和类型不同,但查询逻辑一致。
### 字段对照表
| 说明 | 飞书表 `lark_group_message` | 微信表 `wechat_group_message` |
|------|---------------------------|------------------------------|
| 消息唯一ID | `message_id` (varchar, 如 `om_xxx`) | `svr_msg_id` (bigint) |
| 被引用消息ID | `quote_message_id` (varchar) | `refer_msg_svrid` (bigint) |
| 判断是否引用回复 | `quote_message_id IS NOT NULL` | `refer_msg_svrid IS NOT NULL` |
### 飞书:查询引用关系
```sql
-- 查询某条消息的所有回复
SELECT message_id, sender_name, content, msg_time
FROM lark_group_message
WHERE quote_message_id = 'om_xxx'
ORDER BY msg_timestamp;
-- 查询完整对话链(原消息 + 所有回复)
SELECT message_id, sender_name, content, msg_time,
IF(quote_message_id IS NULL, '原消息', '↳ 回复') AS role
FROM lark_group_message
WHERE message_id = 'om_xxx' OR quote_message_id = 'om_xxx'
ORDER BY msg_timestamp;
-- 向上追溯:找到某条回复引用的原消息
SELECT b.message_id, b.sender_name, b.content, b.msg_time
FROM lark_group_message a
JOIN lark_group_message b ON a.quote_message_id = b.message_id
WHERE a.message_id = 'om_当前消息ID';
```
### 微信:查询引用关系
```sql
-- 查询某条消息的所有回复
SELECT svr_msg_id, sender_name, content, msg_time
FROM wechat_group_message
WHERE refer_msg_svrid = 12345678
ORDER BY msg_timestamp;
-- 查询完整对话链(原消息 + 所有回复)
SELECT svr_msg_id, sender_name, content, msg_time,
IF(refer_msg_svrid IS NULL, '原消息', '↳ 回复') AS role
FROM wechat_group_message
WHERE svr_msg_id = 12345678 OR refer_msg_svrid = 12345678
ORDER BY msg_timestamp;
-- 向上追溯:找到某条回复引用的原消息
SELECT b.svr_msg_id, b.sender_name, b.content, b.msg_time
FROM wechat_group_message a
JOIN wechat_group_message b ON a.refer_msg_svrid = b.svr_msg_id
WHERE a.svr_msg_id = 12345678;
```
### 跨渠道统计:有多少反馈是引用回复
```sql
-- 飞书引用回复数量
SELECT COUNT(*) AS feishu_replies FROM lark_group_message WHERE quote_message_id IS NOT NULL;
-- 微信引用回复数量
SELECT COUNT(*) AS wechat_replies FROM wechat_group_message WHERE refer_msg_svrid IS NOT NULL;
```
### 典型使用场景
1. **追溯问题讨论**用户报了一个bug后续有人回复了解决方案 → 通过引用ID查询完整讨论链
2. **统计热门问题**:被回复最多的消息往往是影响面大的问题 → 按引用次数排序
3. **关联反馈汇总**:同一个问题可能有多条引用回复补充细节 → 聚合展示原消息及所有回复
## 注意事项 ## 注意事项