diff --git a/.env.sample b/.env.sample new file mode 100644 index 0000000..7de93b8 --- /dev/null +++ b/.env.sample @@ -0,0 +1,15 @@ +# MySQL +MYSQL_HOST=localhost +MYSQL_PORT=3306 +MYSQL_USER=root +MYSQL_PASSWORD=your_password +MYSQL_DATABASE=your_database +MYSQL_TABLE=wechat_group_message + +# Tencent COS +COS_SECRET_ID=your_cos_secret_id +COS_SECRET_KEY=your_cos_secret_key +COS_BUCKET=your_bucket_name +COS_REGION=ap-beijing +COS_DOWNLOAD_DOMAIN=your_domain.com +COS_BASE_PATH=your/cos/path diff --git a/.gitignore b/.gitignore index 24eecf2..24827b1 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,7 @@ Thumbs.db *.db-shm # Sensitive data — NEVER commit +.env *.json !pyproject.toml !npm/**/package.json diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..00c3530 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,45 @@ +# wechat-cli 项目 + +## 项目概述 +基于本地微信数据库的 CLI 查询工具,支持会话、消息、联系人、群成员等查询。AI-first 设计,输出结构化 JSON。 + +## 技术栈 +- Python 3.10+, Click, pycryptodome, zstandard +- SQLCipher 4 解密(AES-256-CBC) +- npm 分发 + PyInstaller 打包 + +## 目录结构 +- `wechat_cli/` — 主 CLI 包(commands/, core/, keys/, output/) +- `skills/` — Claude Code skills(export-chat, tencent-cos-upload) +- `collector/` — 群聊消息自动采集器 +- `collect_chats.py` — 采集器入口脚本 +- `npm/` — npm 分发包 + +## 群聊采集器(collector/) +自动采集微信群聊消息的子系统: +- `collector/config.py` — 配置管理(MySQL、COS、扫描策略、回溯补录) +- `collector/storage.py` — MySQL 存储层(wechat_group_message 表) +- `collector/wechat_adapter.py` — 封装 wechat_cli.core 的适配层,包含 XML 元信息提取 +- `collector/scanner.py` — 优先级队列扫描调度 + COS 媒体上传 +- `collector/backfill.py` — 媒体回溯补录(watchdog 实时监听 + 定时兜底扫描) + +## 关键约定 +- wechat-cli 是只读工具,不修改微信数据 +- 群聊通过 `@chatroom` 后缀识别 +- 消息可跨多个 `message_N.db` 文件 +- DBCache 使用 mtime 机制缓存解密副本到 `/tmp/wechat_cli_cache/` +- 采集器独立维护状态,不干扰 `~/.wechat-cli/last_check.json` +- 非文本消息(文件/视频/链接等)从 XML 中提取元信息存入 content 字段,不依赖本地文件下载 +- 群聊消息 content_raw 格式为 `wxid:\n`,解析 XML 时需先剥离发送者前缀 +- 聊天记录合并转发(app_type=19)解析 recorditem XML,格式化为多行纯文本 +- 引用/回复消息(app_type=57)提取 refermsg 中的 svrid 建立消息关联,content 格式化为 "正文\n ↳ 回复 发送者: 被引用内容" +- 数据库通过 `svr_msg_id`(微信 server_id)和 `refer_msg_svrid`(refermsg/svrid)实现引用消息的关联查询 +- 采集器扫描策略: hot=15s, warm=30s, cold 退避至 120s(backoff 1.2),保证 ≤2min 入库 +- 媒体文件只采集原始可读文件,不做 .dat 解密 + - 图片: 从 `temp/RWTemp/YYYY-MM/{md5}.ext` 获取(微信查看图片后自动生成) + - 文件: 从 `msg/file/YYYY-MM/filename` 获取 + - 视频: 从 `msg/video/YYYY-MM/{rawmd5}.mp4` 获取 +- 图片消息 content 格式为 `[图片] {md5}`,md5 是图片原始内容的 hash(来自消息 XML 的 md5 属性) +- **重要**: RWTemp 中的图片文件名是派生 hash,与消息 XML 中的 md5/aeskey 等属性均不对应。匹配图片必须通过 `hashlib.md5(文件内容)` 计算后与 XML md5 属性比对,不能用文件名匹配 +- 回溯补录: watchdog 监听 temp/RWTemp + msg/file + msg/video 目录 + 每 10 分钟定时扫描 media_url=NULL 记录(7天内) +- 默认日志级别 DEBUG,可通过配置文件 `~/.wechat-cli/collect-chats/config.json` 修改 diff --git a/COLLECT_README.md b/COLLECT_README.md new file mode 100644 index 0000000..a8197ea --- /dev/null +++ b/COLLECT_README.md @@ -0,0 +1,93 @@ +# 微信群聊消息采集器 + +自动扫描所有微信群聊,增量采集新消息,存入 MySQL,媒体文件上传腾讯 COS。 + +## 前置条件 +0. 安装 wechat-cli +cd wechat-cli +pip install -e . + +微信版本建议 4.1.6 或 4.1.8 + +1. **微信已登录** 且 `wechat-cli` 已初始化: + ```bash + sudo wechat-cli init + ``` + +2. **安装依赖**: + ```bash + pip3 install pymysql cos-python-sdk-v5 + ``` + +## 快速启动 + +```bash +cd /Users/makee/Documents/cris/workspace/wechat-cli + +# 查看所有群聊 +python3 collect_chats.py groups + +# 一次性扫描(测试用) +python3 collect_chats.py scan + +# 守护模式(持续运行) +python3 collect_chats.py daemon + +# 查看采集统计 +python3 collect_chats.py status + +# 停止采集某个群 +python3 collect_chats.py disable "广告群" + +# 恢复采集 +python3 collect_chats.py enable "广告群" +``` + +## 扫描策略 + +采集器使用自适应频率扫描: + +| 活跃度 | 条件 | 扫描间隔 | +|--------|------|----------| +| HOT | 最新消息 < 5 分钟 | 60 秒 | +| WARM | 最新消息 < 1 小时 | 5 分钟 | +| COLD | 无近期消息 | 逐步退避至 30 分钟 | + +- 每轮最多扫描 5 个群,群间随机延迟 0~3 秒 +- 每 5 分钟重新发现新群聊 +- 支持白名单/黑名单过滤(正则匹配) + +## 自定义配置 + +创建 `~/.wechat-cli/collect-chats/config.json`(可选,所有字段都有默认值): + +```json +{ + "min_interval": 60, + "base_interval": 300, + "max_interval": 1800, + "batch_size": 5, + "messages_per_scan": 200, + "whitelist": [], + "blacklist": ["广告群", ".*测试.*"] +} +``` + +## 数据存储 + +- **消息** → MySQL `vala_test.wechat_group_message` 表 +- **媒体文件**(语音/视频/文件)→ 腾讯 COS `vala_llm/user_feedback/wechat/类型/YYYY-MM/` +- **图片** → 暂不采集(微信 V2 AES 加密限制) + +## 后台运行 + +```bash +# nohup 方式 +nohup python3 collect_chats.py daemon > collector.log 2>&1 & + +# 查看日志 +tail -f collector.log + +# 停止 +kill $(pgrep -f "collect_chats.py daemon") +``` diff --git a/collect_chats.py b/collect_chats.py new file mode 100644 index 0000000..f5a2a91 --- /dev/null +++ b/collect_chats.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +""" +微信群聊消息自动采集器 + +用法: + python3 collect_chats.py scan # 一次性扫描所有群聊 + python3 collect_chats.py daemon # 持续运行(自适应频率) + python3 collect_chats.py status # 查看采集统计 + python3 collect_chats.py groups # 列出所有群聊 + python3 collect_chats.py enable "群名" # 启用某群 + python3 collect_chats.py disable "群名" # 停止某群 +""" + +import argparse +import json +import logging +import os +import signal +import sys +import time +from datetime import datetime + +from collector.config import CollectorConfig +from collector.storage import MessageStorage +from collector.wechat_adapter import WeChatAdapter +from collector.scanner import GroupScanner, _init_cos_uploader +from collector.backfill import MediaBackfiller + +log = logging.getLogger("collector") + + +def setup_logging(level="INFO"): + fmt = "%(asctime)s [%(levelname)s] %(message)s" + logging.basicConfig(level=getattr(logging, level, logging.INFO), + format=fmt, datefmt="%Y-%m-%d %H:%M:%S") + + +def cmd_scan(args): + """一次性扫描""" + config = CollectorConfig.load(args.config) + setup_logging(config.log_level) + + log.info("初始化微信数据适配器...") + adapter = WeChatAdapter() + + log.info("连接 MySQL...") + storage = MessageStorage(config) + storage.ensure_table() + + scanner = GroupScanner(adapter, storage, config) + + log.info("发现群聊...") + scanner.discover_groups() + + log.info("开始扫描...") + total = 0 + # 循环扫描直到所有群都扫完一遍 + rounds = 0 + while True: + n = scanner.scan_next_batch() + total += n + rounds += 1 + # 如果没有到期的群了,说明本轮扫完 + if scanner.time_to_next() > 0: + break + + log.info("扫描完成: 共 %d 轮, 新增 %d 条消息", rounds, total) + + # 打印统计 + status = scanner.get_status() + print(json.dumps(status, ensure_ascii=False, indent=2)) + storage.close() + + +def cmd_daemon(args): + """守护模式 — 持续运行""" + config = CollectorConfig.load(args.config) + setup_logging(config.log_level) + + running = True + + def on_signal(signum, _): + nonlocal running + log.info("收到信号 %d, 准备退出...", signum) + running = False + + signal.signal(signal.SIGINT, on_signal) + signal.signal(signal.SIGTERM, on_signal) + + log.info("=" * 50) + log.info("微信群聊采集器 - 守护模式启动") + log.info("扫描策略: hot=%ds, warm=%ds, cold 退避至 %ds", + int(config.min_interval), int(config.base_interval), int(config.max_interval)) + log.info("批次=%d, 每群最多=%d条/次", config.batch_size, config.messages_per_scan) + log.info("=" * 50) + + adapter = WeChatAdapter() + storage = MessageStorage(config) + storage.ensure_table() + scanner = GroupScanner(adapter, storage, config) + + # 启动回溯补录器(使用独立的 MySQL 连接,避免多线程冲突) + backfiller = None + if config.backfill_enabled: + wechat_base = os.path.dirname(adapter.db_dir) if adapter.db_dir else "" + backfill_storage = MessageStorage(config) + backfill_storage.ensure_table() + backfiller = MediaBackfiller( + storage=backfill_storage, config=config, + wechat_base_dir=wechat_base + ) + try: + cos = _init_cos_uploader(config) + backfiller.set_cos_uploader(cos) + except Exception as e: + log.warning("COS 初始化失败,回溯补录上传功能不可用: %s", e) + backfiller.start() + + # 初始发现 + scanner.discover_groups() + + cycle = 0 + while running: + cycle += 1 + + # 定期重新发现群聊 + if scanner.should_discover(): + try: + adapter.refresh_names() + scanner.discover_groups() + except Exception as e: + log.error("群聊发现失败: %s", e) + + # 扫描批次 + try: + n = scanner.scan_next_batch() + if n > 0: + log.info("--- 第 %d 轮: 新增 %d 条消息 ---", cycle, n) + except Exception as e: + log.error("扫描异常: %s", e) + + # 智能休眠 + sleep_time = scanner.time_to_next() + if sleep_time > 0 and running: + time.sleep(sleep_time) + + log.info("采集器已停止") + if backfiller: + backfiller.stop() + storage.close() + + +def cmd_status(args): + """查看采集统计""" + config = CollectorConfig.load(args.config) + storage = MessageStorage(config) + storage.ensure_table() + + total = storage.get_total_count() + groups = storage.get_group_stats() + + print(f"\n消息总数: {total}") + print(f"群聊数量: {len(groups)}") + print("-" * 60) + + for g in groups: + last_dt = datetime.fromtimestamp(g["last_ts"]).strftime("%m-%d %H:%M") if g["last_ts"] else "无" + print(f" {g['group_name']:20s} {g['total']:>6d} 条 最新: {last_dt}") + + storage.close() + + +def cmd_groups(args): + """列出所有群聊""" + config = CollectorConfig.load(args.config) + setup_logging(config.log_level) + + adapter = WeChatAdapter() + sessions = adapter.list_group_sessions(limit=500) + + print(f"\n共发现 {len(sessions)} 个群聊:") + print("-" * 60) + for s in sessions: + ts = datetime.fromtimestamp(s["last_timestamp"]).strftime("%m-%d %H:%M") + unread_tag = f" ({s['unread']}条未读)" if s["unread"] else "" + print(f" [{ts}] {s['display_name']}{unread_tag}") + print(f" {s['username']}") + + +def cmd_enable(args): + """启用/停用群聊(通过修改配置黑名单)""" + config = CollectorConfig.load(args.config) + group_name = args.group_name + if group_name in config.blacklist: + config.blacklist.remove(group_name) + config.save(args.config) + print(f"已从黑名单移除: {group_name}") + else: + print(f"{group_name} 不在黑名单中") + + +def cmd_disable(args): + config = CollectorConfig.load(args.config) + group_name = args.group_name + if group_name not in config.blacklist: + config.blacklist.append(group_name) + config.save(args.config) + print(f"已加入黑名单: {group_name}") + else: + print(f"{group_name} 已在黑名单中") + + +def main(): + parser = argparse.ArgumentParser( + description="微信群聊消息自动采集器", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--config", default=None, help="配置文件路径") + sub = parser.add_subparsers(dest="command") + + sub.add_parser("scan", help="一次性扫描所有群聊") + sub.add_parser("daemon", help="启动守护模式持续采集") + sub.add_parser("status", help="查看采集统计") + sub.add_parser("groups", help="列出所有群聊") + + p_enable = sub.add_parser("enable", help="启用某群采集") + p_enable.add_argument("group_name", help="群聊名称") + + p_disable = sub.add_parser("disable", help="停止某群采集") + p_disable.add_argument("group_name", help="群聊名称") + + args = parser.parse_args() + if not args.command: + parser.print_help() + sys.exit(1) + + commands = { + "scan": cmd_scan, + "daemon": cmd_daemon, + "status": cmd_status, + "groups": cmd_groups, + "enable": cmd_enable, + "disable": cmd_disable, + } + commands[args.command](args) + + +if __name__ == "__main__": + main() diff --git a/collector/__init__.py b/collector/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/collector/backfill.py b/collector/backfill.py new file mode 100644 index 0000000..f318ef3 --- /dev/null +++ b/collector/backfill.py @@ -0,0 +1,399 @@ +"""媒体回溯补录模块 — watchdog 实时监听 + 定时兜底扫描 + +当用户点开微信中的图片/视频/文件后,本地会生成可读文件。 +本模块检测这些新文件,上传到 COS,并回填 media_url 到数据库中之前为 NULL 的记录。 + +图片: temp/RWTemp/YYYY-MM/{md5}.ext(微信下载/查看后自动生成) +文件/视频/音频: msg/file/YYYY-MM/filename(微信接收后自动生成) +""" + +import hashlib +import logging +import os +import re +import threading +import time +from datetime import datetime + +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler, FileCreatedEvent + +from .config import CollectorConfig +from .storage import MessageStorage + +log = logging.getLogger(__name__) + +IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} +VIDEO_EXTS = {".mp4", ".mov", ".avi"} +FILE_EXTS_SKIP = {".tmp", ".downloading"} + + +def _is_readable_media(filepath: str) -> str | None: + """判断文件是否为可读媒体文件,返回 'image'/'video'/'file' 或 None""" + ext = os.path.splitext(filepath)[1].lower() + if ext in FILE_EXTS_SKIP: + return None + basename = os.path.basename(filepath) + if "_thumb" in basename: + return None + if ext in IMAGE_EXTS: + return "image" + if ext in VIDEO_EXTS: + return "video" + if ext and ext not in FILE_EXTS_SKIP: + return "file" + return None + + +def _is_in_rwtemp(filepath: str) -> bool: + """判断文件是否在 RWTemp 目录内""" + return "/temp/RWTemp/" in filepath + + +def _file_md5(filepath: str) -> str: + """计算文件内容的 md5""" + h = hashlib.md5() + with open(filepath, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +def _extract_md5_from_content(content: str) -> str | None: + """从 DB content 字段提取图片 md5: '[图片] abc123...'""" + if not content: + return None + m = re.search(r'\[图片\]\s+([a-f0-9]{32})', content) + return m.group(1) if m else None + + +class MediaFileHandler(FileSystemEventHandler): + """watchdog 事件处理器 — 新文件出现时触发回溯匹配""" + + def __init__(self, backfiller: "MediaBackfiller"): + super().__init__() + self._backfiller = backfiller + + def on_created(self, event): + if event.is_directory: + return + if not isinstance(event, FileCreatedEvent): + return + threading.Timer(2.0, self._handle_new_file, args=[event.src_path]).start() + + def _handle_new_file(self, filepath: str): + try: + if not os.path.isfile(filepath): + return + if os.path.getsize(filepath) < 100: + return + media_type = _is_readable_media(filepath) + if not media_type: + return + # RWTemp 中的都是图片 + if _is_in_rwtemp(filepath): + media_type = "image" + log.info("[watchdog] 检测到新媒体文件: %s (type=%s)", filepath, media_type) + self._backfiller.try_backfill_file(filepath, media_type) + except Exception as e: + log.warning("[watchdog] 处理文件异常 %s: %s", filepath, e) + + +class MediaBackfiller: + """媒体回溯补录器 + + 两层保障: + 1. watchdog 实时监听 RWTemp(图片) 和 msg/file(文件),新文件立即匹配上传 + 2. 定时扫描 DB 中 media_url=NULL 的记录,重新尝试路径解析 + """ + + def __init__(self, storage: MessageStorage, config: CollectorConfig, + cos_uploader=None, wechat_base_dir: str = ""): + self._storage = storage + self._cfg = config + self._cos = cos_uploader + self._wechat_base = wechat_base_dir + self._observer: Observer | None = None + self._scan_thread: threading.Thread | None = None + self._running = False + self._upload_lock = threading.Lock() + + def start(self): + """启动 watchdog 监听 + 定时扫描线程""" + if self._running: + return + self._running = True + + watch_dirs = self._get_watch_dirs() + if watch_dirs: + self._observer = Observer() + handler = MediaFileHandler(self) + for d in watch_dirs: + self._observer.schedule(handler, d, recursive=True) + log.info("[backfill] 监听目录: %s", d) + self._observer.start() + log.info("[backfill] watchdog 已启动, 监听 %d 个目录", len(watch_dirs)) + else: + log.warning("[backfill] 未找到可监听的微信媒体目录") + + self._scan_thread = threading.Thread( + target=self._periodic_scan_loop, daemon=True, name="backfill-scan" + ) + self._scan_thread.start() + log.info("[backfill] 定时扫描已启动, 间隔 %ds", int(self._cfg.backfill_interval)) + + def stop(self): + """停止所有后台线程""" + self._running = False + if self._observer: + self._observer.stop() + self._observer.join(timeout=5) + self._observer = None + if self._scan_thread: + self._scan_thread.join(timeout=10) + self._scan_thread = None + log.info("[backfill] 已停止") + + def set_cos_uploader(self, cos): + """延迟设置 COS 上传器""" + self._cos = cos + + def _get_watch_dirs(self) -> list[str]: + """获取需要监听的目录: temp/RWTemp(图片) + msg/file(文件) + msg/video(视频)""" + if not self._wechat_base: + return [] + + dirs = [] + rwtemp = os.path.join(self._wechat_base, "temp", "RWTemp") + if os.path.isdir(rwtemp): + dirs.append(rwtemp) + + file_dir = os.path.join(self._wechat_base, "msg", "file") + if os.path.isdir(file_dir): + dirs.append(file_dir) + + video_dir = os.path.join(self._wechat_base, "msg", "video") + if os.path.isdir(video_dir): + dirs.append(video_dir) + + return dirs + + def try_backfill_file(self, filepath: str, media_type: str): + """尝试将新文件匹配到 DB 中的 pending 记录并上传""" + if not self._cos: + log.debug("[backfill] COS 未配置, 跳过上传") + return + + with self._upload_lock: + pending = self._storage.get_pending_media_messages( + lookback_days=self._cfg.backfill_lookback_days, limit=500 + ) + matched = self._match_file_to_record(filepath, media_type, pending) + if matched: + self._do_upload_and_update(filepath, matched) + + def _match_file_to_record(self, filepath: str, media_type: str, + records: list[dict]) -> dict | None: + """将文件路径匹配到数据库记录""" + if media_type == "image": + return self._match_image(filepath, records) + elif media_type == "video": + return self._match_video(filepath, records) + else: + return self._match_file(filepath, records) + + def _match_image(self, filepath: str, records: list[dict]) -> dict | None: + """图片匹配: 计算文件内容 md5,匹配 DB content 中的 md5""" + try: + file_hash = _file_md5(filepath) + except Exception: + return None + for rec in records: + if rec["msg_type"] != "image": + continue + rec_md5 = _extract_md5_from_content(rec.get("content", "")) + if rec_md5 and rec_md5 == file_hash: + log.info("[backfill] 图片内容md5匹配: %s → record id=%d", file_hash, rec["id"]) + return rec + return None + + def _match_file(self, filepath: str, records: list[dict]) -> dict | None: + """文件匹配: 通过文件名匹配 DB content 中的标题""" + filename = os.path.basename(filepath) + date_prefix = self._extract_date_from_path(filepath) + + for rec in records: + if rec["msg_type"] != "file": + continue + if date_prefix: + rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m") + if rec_date != date_prefix: + continue + content = rec.get("content", "") or "" + title = content.split(" (")[0].strip() if content else "" + if title and (title in filename or filename in title): + log.info("[backfill] 文件匹配: %s → record id=%d", filename, rec["id"]) + return rec + return None + + def _match_video(self, filepath: str, records: list[dict]) -> dict | None: + """视频匹配: 通过文件名中的 rawmd5 匹配同月的 video 记录""" + filename = os.path.splitext(os.path.basename(filepath))[0].lower() + date_prefix = self._extract_date_from_path(filepath) + + for rec in records: + if rec["msg_type"] != "video": + continue + if date_prefix: + rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m") + if rec_date != date_prefix: + continue + if re.fullmatch(r'[a-f0-9]{32}', filename): + log.info("[backfill] 视频匹配: %s → record id=%d", filename, rec["id"]) + return rec + return None + + def _do_upload_and_update(self, filepath: str, record: dict): + """上传文件到 COS 并更新 DB(需在锁内调用)""" + if not self._cos or not os.path.isfile(filepath): + return + + try: + ext = os.path.splitext(filepath)[1] + filename = os.path.basename(filepath) + safe_name = re.sub(r'[^\w.\-]', '_', filename) + dt = datetime.fromtimestamp(record["msg_timestamp"]) + date_prefix = dt.strftime("%Y-%m") + cos_key = f"{self._cfg.cos_base_path}/{record['msg_type']}/{date_prefix}/{safe_name}" + + url = self._cos.upload(filepath, cos_key) + if url: + updated = self._storage.update_media_url(record["id"], url) + if updated: + log.info("[backfill] 回溯成功: id=%d, type=%s → %s", + record["id"], record["msg_type"], url) + else: + log.debug("[backfill] 记录已被更新(可能重复): id=%d", record["id"]) + except Exception as e: + log.warning("[backfill] 上传失败: %s → %s", filepath, e) + + def _periodic_scan_loop(self): + """定时扫描兜底线程""" + while self._running: + try: + self._do_periodic_scan() + except Exception as e: + log.error("[backfill] 定时扫描异常: %s", e) + + wait = self._cfg.backfill_interval + elapsed = 0 + while elapsed < wait and self._running: + time.sleep(min(5.0, wait - elapsed)) + elapsed += 5.0 + + def _do_periodic_scan(self): + """执行一次全量回溯扫描""" + if not self._cos: + return + + with self._upload_lock: + pending = self._storage.get_pending_media_messages( + lookback_days=self._cfg.backfill_lookback_days + ) + + if not pending: + return + + log.info("[backfill] 定时扫描: %d 条待补录记录", len(pending)) + filled = 0 + + for rec in pending: + if not self._running: + break + filepath = self._try_resolve_path(rec) + if filepath and os.path.isfile(filepath): + with self._upload_lock: + self._do_upload_and_update(filepath, rec) + filled += 1 + + if filled > 0: + log.info("[backfill] 定时扫描完成: 成功补录 %d 条", filled) + + def _try_resolve_path(self, record: dict) -> str | None: + """尝试为一条 pending 记录解析本地文件路径""" + if not self._wechat_base: + return None + + msg_type = record["msg_type"] + msg_ts = record["msg_timestamp"] + dt = datetime.fromtimestamp(msg_ts) + date_prefix = dt.strftime("%Y-%m") + + if msg_type == "image": + return self._resolve_image_path(date_prefix, record) + elif msg_type == "video": + return self._resolve_video_path(date_prefix) + elif msg_type == "file": + return self._resolve_file_path(date_prefix, record) + return None + + def _resolve_image_path(self, date_prefix: str, record: dict) -> str | None: + """在 temp/RWTemp/YYYY-MM/ 中按文件内容 md5 查找图片""" + img_md5 = _extract_md5_from_content(record.get("content", "")) + if not img_md5: + return None + + rwtemp_dir = os.path.join(self._wechat_base, "temp", "RWTemp", date_prefix) + if not os.path.isdir(rwtemp_dir): + return None + + for f in os.listdir(rwtemp_dir): + fp = os.path.join(rwtemp_dir, f) + ext = os.path.splitext(f)[1].lower() + if ext not in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"): + continue + if not os.path.isfile(fp): + continue + try: + if _file_md5(fp) == img_md5: + return fp + except Exception: + continue + return None + + def _resolve_video_path(self, date_prefix: str) -> str | None: + """在 msg/video/YYYY-MM/ 中查找 mp4""" + video_dir = os.path.join(self._wechat_base, "msg", "video", date_prefix) + if not os.path.isdir(video_dir): + return None + for f in os.listdir(video_dir): + if f.endswith(".mp4") and os.path.isfile(os.path.join(video_dir, f)): + return os.path.join(video_dir, f) + return None + + def _resolve_file_path(self, date_prefix: str, record: dict) -> str | None: + """在 msg/file/YYYY-MM/ 中按文件名查找""" + content = record.get("content", "") or "" + title = content.split(" (")[0].strip() if content else "" + if not title: + return None + + file_dir = os.path.join(self._wechat_base, "msg", "file", date_prefix) + if not os.path.isdir(file_dir): + return None + + target = os.path.join(file_dir, title) + if os.path.isfile(target): + return target + + for f in os.listdir(file_dir): + if title in f or f in title: + fp = os.path.join(file_dir, f) + if os.path.isfile(fp): + return fp + return None + + @staticmethod + def _extract_date_from_path(filepath: str) -> str | None: + m = re.search(r'(\d{4}-\d{2})', filepath) + return m.group(1) if m else None diff --git a/collector/config.py b/collector/config.py new file mode 100644 index 0000000..2603b14 --- /dev/null +++ b/collector/config.py @@ -0,0 +1,69 @@ +"""采集器配置""" + +import json +import os +from dataclasses import dataclass, field, asdict +from dotenv import load_dotenv + +load_dotenv() + +DEFAULT_CONFIG_PATH = os.path.expanduser("~/.wechat-cli/collect-chats/config.json") + + +@dataclass +class CollectorConfig: + # ---- MySQL ---- + mysql_host: str = os.environ.get("MYSQL_HOST", "localhost") + mysql_port: int = int(os.environ.get("MYSQL_PORT", "3306")) + mysql_user: str = os.environ.get("MYSQL_USER", "root") + mysql_password: str = os.environ.get("MYSQL_PASSWORD", "") + mysql_database: str = os.environ.get("MYSQL_DATABASE", "") + mysql_table: str = os.environ.get("MYSQL_TABLE", "wechat_group_message") + + # ---- 腾讯 COS ---- + cos_secret_id: str = os.environ.get("COS_SECRET_ID", "") + cos_secret_key: str = os.environ.get("COS_SECRET_KEY", "") + cos_bucket: str = os.environ.get("COS_BUCKET", "") + cos_region: str = os.environ.get("COS_REGION", "ap-beijing") + cos_download_domain: str = os.environ.get("COS_DOWNLOAD_DOMAIN", "") + cos_base_path: str = os.environ.get("COS_BASE_PATH", "") + + # ---- 扫描策略 ---- + min_interval: float = 15.0 # hot 群扫描间隔(秒) + base_interval: float = 30.0 # warm 群扫描间隔 + max_interval: float = 120.0 # cold 群最大间隔(保证 ≤2min 入库) + backoff_factor: float = 1.2 # cold 退避系数 + batch_size: int = 10 # 每轮最多扫描群数 + messages_per_scan: int = 200 # 每群每次最多拉取消息数 + jitter_max: float = 1.0 # 群间随机延迟上限(秒) + cycle_sleep: float = 5.0 # 轮次间休眠(秒) + discovery_interval: float = 180.0 # 群聊发现间隔(秒) + hot_threshold: int = 300 # 最新消息 < N秒 算 hot + warm_threshold: int = 3600 # 最新消息 < N秒 算 warm + + # ---- 过滤 ---- + whitelist: list = field(default_factory=list) # 空=全部采集 + blacklist: list = field(default_factory=list) # 跳过的群(支持正则) + + # ---- 回溯补录 ---- + backfill_interval: float = 600.0 # 定时扫描间隔(秒), 默认 10 分钟 + backfill_lookback_days: int = 7 # 回溯天数 + backfill_enabled: bool = True # 是否启用回溯补录 + + # ---- 其他 ---- + log_level: str = "DEBUG" + + @classmethod + def load(cls, path=None): + path = path or DEFAULT_CONFIG_PATH + if os.path.isfile(path): + with open(path, encoding="utf-8") as f: + data = json.load(f) + return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) + return cls() + + def save(self, path=None): + path = path or DEFAULT_CONFIG_PATH + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(asdict(self), f, ensure_ascii=False, indent=2) diff --git a/collector/scanner.py b/collector/scanner.py new file mode 100644 index 0000000..5d6d892 --- /dev/null +++ b/collector/scanner.py @@ -0,0 +1,312 @@ +"""扫描调度器 — 群聊发现 + 优先级队列 + 自适应频率 + COS 上传""" + +import heapq +import logging +import os +import re +import sys +import time +import random +from datetime import datetime + +from .config import CollectorConfig +from .storage import MessageStorage +from .wechat_adapter import WeChatAdapter, MEDIA_TYPES + +log = logging.getLogger(__name__) + + +def _init_cos_uploader(config: CollectorConfig): + """延迟初始化 COS 上传器(避免未安装 SDK 时导入失败)""" + cos_script = os.path.join( + os.path.dirname(os.path.dirname(__file__)), + "skills", "tencent-cos-upload.xiaokui", "scripts", + ) + if cos_script not in sys.path: + sys.path.insert(0, cos_script) + from cos_upload import CosUploader + return CosUploader( + secret_id=config.cos_secret_id, + secret_key=config.cos_secret_key, + region=config.cos_region, + bucket=config.cos_bucket, + domain=config.cos_download_domain, + ) + + +def _build_cos_key(config: CollectorConfig, msg_type: str, create_time: int, filename: str) -> str: + """构建 COS 存储路径: {base}/类型/YYYY-MM/文件名""" + dt = datetime.fromtimestamp(create_time) + date_prefix = dt.strftime("%Y-%m") + # 文件名清理:去除中文和特殊字符,保留 ASCII + safe_name = re.sub(r'[^\w.\-]', '_', filename) + return f"{config.cos_base_path}/{msg_type}/{date_prefix}/{safe_name}" + + +class GroupState: + """单个群聊的扫描状态""" + __slots__ = ( + "username", "display_name", "next_scan_time", + "scan_interval", "activity_level", "last_message_at", + "consecutive_empty", + ) + + def __init__(self, username, display_name, next_scan_time=0, + scan_interval=300.0, activity_level="cold", + last_message_at=0, consecutive_empty=0): + self.username = username + self.display_name = display_name + self.next_scan_time = next_scan_time + self.scan_interval = scan_interval + self.activity_level = activity_level + self.last_message_at = last_message_at + self.consecutive_empty = consecutive_empty + + def __lt__(self, other): + return self.next_scan_time < other.next_scan_time + + +class GroupScanner: + def __init__(self, adapter: WeChatAdapter, storage: MessageStorage, config: CollectorConfig): + self._adapter = adapter + self._storage = storage + self._cfg = config + self._groups: dict[str, GroupState] = {} # username -> GroupState + self._queue: list[GroupState] = [] # heapq + self._cos = None + self._last_discovery = 0 + + def _get_cos(self): + if self._cos is None: + try: + self._cos = _init_cos_uploader(self._cfg) + log.info("COS 上传器初始化成功") + except Exception as e: + log.warning("COS 上传器初始化失败,媒体文件将不会上传: %s", e) + return self._cos + + def _matches_filter(self, display_name: str, username: str, patterns: list) -> bool: + """检查群名或 username 是否匹配过滤模式列表""" + for pat in patterns: + if pat == display_name or pat == username: + return True + try: + if re.search(pat, display_name) or re.search(pat, username): + return True + except re.error: + pass + return False + + def discover_groups(self) -> list[str]: + """发现所有群聊,应用过滤规则,返回新增群列表""" + sessions = self._adapter.list_group_sessions(limit=500) + new_groups = [] + + for s in sessions: + username = s["username"] + display = s["display_name"] + + # 白名单过滤 + if self._cfg.whitelist: + if not self._matches_filter(display, username, self._cfg.whitelist): + continue + + # 黑名单过滤 + if self._cfg.blacklist: + if self._matches_filter(display, username, self._cfg.blacklist): + continue + + if username not in self._groups: + # 新发现的群 + last_ts = self._storage.get_last_msg_timestamp(username) + state = GroupState( + username=username, + display_name=display, + next_scan_time=time.time(), # 立即扫描 + scan_interval=self._cfg.base_interval, + last_message_at=last_ts, + ) + self._groups[username] = state + heapq.heappush(self._queue, state) + new_groups.append(username) + log.info("发现群聊: %s (%s)", display, username) + else: + # 已知群,更新名称 + self._groups[username].display_name = display + + self._last_discovery = time.time() + log.info("群聊发现完成: 共 %d 个群, 新增 %d 个", len(self._groups), len(new_groups)) + return new_groups + + def scan_next_batch(self) -> int: + """扫描到期的群聊批次,返回本批新增消息总数""" + now = time.time() + total_new = 0 + scanned = 0 + + while scanned < self._cfg.batch_size and self._queue: + # peek + if self._queue[0].next_scan_time > now: + break + + state = heapq.heappop(self._queue) + + # 可能已经被移除 + if state.username not in self._groups: + continue + + start_t = time.time() + try: + new_count = self._scan_single_group(state) + duration_ms = int((time.time() - start_t) * 1000) + total_new += new_count + self._update_activity(state, new_count) + log.info( + "[%s] %s: +%d 条, 耗时 %dms, 下次 %ds 后 (%s)", + state.activity_level.upper(), state.display_name, + new_count, duration_ms, int(state.scan_interval), + state.username, + ) + except Exception as e: + duration_ms = int((time.time() - start_t) * 1000) + log.error("[ERROR] %s 扫描失败: %s", state.display_name, e) + # 出错后延长间隔 + state.scan_interval = min( + state.scan_interval * 2, self._cfg.max_interval + ) + + # 推回队列 + state.next_scan_time = time.time() + state.scan_interval + heapq.heappush(self._queue, state) + scanned += 1 + + # 群间 jitter + if scanned < self._cfg.batch_size and self._queue: + jitter = random.uniform(0, self._cfg.jitter_max) + time.sleep(jitter) + + return total_new + + def _scan_single_group(self, state: GroupState) -> int: + """扫描单个群聊,返回新增消息数""" + messages = self._adapter.query_new_messages( + state.username, + after_ts=state.last_message_at, + limit=self._cfg.messages_per_scan, + ) + if not messages: + return 0 + + # 处理媒体上传 + 详细日志 + media_stats = {"uploaded": 0, "no_file": 0, "failed": 0} + for msg in messages: + log.info( + " 新消息: [%s] %s: %s (local_id=%d)", + msg["msg_type"], msg.get("sender_name", "?"), + (msg.get("content") or "")[:80], + msg["local_id"], + ) + if msg["msg_type"] in MEDIA_TYPES: + if msg.get("media_path"): + url = self._upload_media(msg) + msg["media_url"] = url + if url: + media_stats["uploaded"] += 1 + else: + media_stats["failed"] += 1 + else: + msg["media_url"] = None + media_stats["no_file"] += 1 + log.info(" → 媒体文件未在本地找到, 跳过上传") + else: + msg["media_url"] = None + + # 批量入库 + inserted = self._storage.insert_messages(messages) + + # 媒体统计日志 + if any(v > 0 for v in media_stats.values()): + log.info( + " 媒体统计: 上传成功=%d, 本地无文件=%d, 上传失败=%d", + media_stats["uploaded"], media_stats["no_file"], media_stats["failed"], + ) + + # 更新状态 + max_ts = max(m["create_time"] for m in messages) + state.last_message_at = max_ts + + return inserted + + def _upload_media(self, msg: dict) -> str | None: + """上传媒体文件到 COS,返回 URL""" + cos = self._get_cos() + if not cos: + return None + + local_path = msg["media_path"] + if not local_path or not os.path.isfile(local_path): + return None + + try: + filename = os.path.basename(local_path) + cos_key = _build_cos_key( + self._cfg, msg["msg_type"], msg["create_time"], filename + ) + url = cos.upload(local_path, cos_key) + log.info(" → COS 上传成功: %s → %s", local_path, url) + return url + except Exception as e: + log.warning("上传失败 %s: %s", local_path, e) + return None + + def _update_activity(self, state: GroupState, new_count: int): + """根据扫描结果更新活跃度和下次扫描间隔""" + now = time.time() + + if new_count > 0: + state.consecutive_empty = 0 + age = now - state.last_message_at if state.last_message_at else float("inf") + + if age < self._cfg.hot_threshold: + state.activity_level = "hot" + state.scan_interval = self._cfg.min_interval + elif age < self._cfg.warm_threshold: + state.activity_level = "warm" + state.scan_interval = self._cfg.base_interval + else: + state.activity_level = "warm" + state.scan_interval = self._cfg.base_interval + else: + state.consecutive_empty += 1 + state.activity_level = "cold" + state.scan_interval = min( + state.scan_interval * self._cfg.backoff_factor, + self._cfg.max_interval, + ) + + def should_discover(self) -> bool: + return time.time() - self._last_discovery >= self._cfg.discovery_interval + + def get_status(self) -> dict: + """获取扫描器当前状态""" + total_msgs = self._storage.get_total_count() + group_stats = {} + for username, state in self._groups.items(): + group_stats[state.display_name] = { + "username": username, + "activity": state.activity_level, + "interval": f"{int(state.scan_interval)}s", + "last_msg_at": state.last_message_at, + } + return { + "total_groups": len(self._groups), + "total_messages": total_msgs, + "groups": group_stats, + } + + def time_to_next(self) -> float: + """返回距离下一个群到期的秒数""" + if not self._queue: + return self._cfg.cycle_sleep + wait = self._queue[0].next_scan_time - time.time() + return max(0, min(wait, self._cfg.cycle_sleep)) diff --git a/collector/storage.py b/collector/storage.py new file mode 100644 index 0000000..1646491 --- /dev/null +++ b/collector/storage.py @@ -0,0 +1,194 @@ +"""MySQL 存储层 — 只操作 wechat_group_message 表""" + +import logging +import pymysql + +log = logging.getLogger(__name__) + +CREATE_TABLE_SQL = """ +CREATE TABLE IF NOT EXISTS `{table}` ( + `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `group_username` VARCHAR(128) NOT NULL COMMENT '群聊 username, 如 xxx@chatroom', + `group_name` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '群聊名称', + `sender_username` VARCHAR(128) NOT NULL DEFAULT '' COMMENT '发送者 wxid', + `sender_name` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '发送者显示名称', + `msg_type` VARCHAR(32) NOT NULL DEFAULT 'text' COMMENT '消息类型: text/voice/video/file/sticker/link/location/call/system', + `content` TEXT COMMENT '消息文本内容或描述', + `media_url` VARCHAR(1024) DEFAULT NULL COMMENT '媒体文件 COS URL', + `local_id` BIGINT NOT NULL DEFAULT 0 COMMENT '微信消息 local_id', + `local_type` BIGINT NOT NULL DEFAULT 0 COMMENT '微信消息原始类型', + `source_db` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '来源 message_N.db 路径', + `msg_time` DATETIME NOT NULL COMMENT '消息发送时间(微信 create_time)', + `msg_timestamp` BIGINT NOT NULL DEFAULT 0 COMMENT '消息时间戳(unix)', + `svr_msg_id` BIGINT UNSIGNED DEFAULT NULL COMMENT '微信服务端消息ID(server_id)', + `refer_msg_svrid` BIGINT UNSIGNED DEFAULT NULL COMMENT '引用消息的服务端ID(refermsg/svrid)', + `collected_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '采集入库时间', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_group_local` (`group_username`, `local_id`, `source_db`, `msg_timestamp`), + KEY `idx_group_time` (`group_username`, `msg_timestamp`), + KEY `idx_msg_time` (`msg_timestamp`), + KEY `idx_sender` (`sender_username`), + KEY `idx_svr_msg_id` (`svr_msg_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci + COMMENT='微信群聊消息采集表'; +""" + +INSERT_SQL = """ +INSERT IGNORE INTO `{table}` + (group_username, group_name, sender_username, sender_name, + msg_type, content, media_url, local_id, local_type, source_db, + msg_time, msg_timestamp, svr_msg_id, refer_msg_svrid) +VALUES + (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, FROM_UNIXTIME(%s), %s, %s, %s) +""" + + +class MessageStorage: + def __init__(self, config): + self._cfg = config + self._table = config.mysql_table + self._conn = None + + def _get_conn(self): + if self._conn is not None: + try: + self._conn.ping(reconnect=True) + return self._conn + except Exception: + self._conn = None + self._conn = pymysql.connect( + host=self._cfg.mysql_host, + port=self._cfg.mysql_port, + user=self._cfg.mysql_user, + password=self._cfg.mysql_password, + database=self._cfg.mysql_database, + charset="utf8mb4", + connect_timeout=10, + read_timeout=30, + write_timeout=30, + autocommit=False, + ) + return self._conn + + def ensure_table(self): + conn = self._get_conn() + with conn.cursor() as cur: + cur.execute(CREATE_TABLE_SQL.format(table=self._table)) + self._ensure_columns(cur) + conn.commit() + log.info("表 %s 已就绪", self._table) + + def _ensure_columns(self, cur): + """为已有表补充新增列(兼容旧表结构)""" + cur.execute(f"SHOW COLUMNS FROM `{self._table}`") + existing = {row[0] for row in cur.fetchall()} + migrations = [ + ("svr_msg_id", "BIGINT UNSIGNED DEFAULT NULL COMMENT '微信服务端消息ID(server_id)' AFTER `msg_timestamp`"), + ("refer_msg_svrid", "BIGINT UNSIGNED DEFAULT NULL COMMENT '引用消息的服务端ID(refermsg/svrid)' AFTER `svr_msg_id`"), + ] + for col, definition in migrations: + if col not in existing: + cur.execute(f"ALTER TABLE `{self._table}` ADD COLUMN `{col}` {definition}") + log.info("已为表 %s 添加列 %s", self._table, col) + if "svr_msg_id" not in existing: + cur.execute(f"ALTER TABLE `{self._table}` ADD INDEX `idx_svr_msg_id` (`svr_msg_id`)") + log.info("已为表 %s 添加索引 idx_svr_msg_id", self._table) + + def insert_messages(self, messages: list[dict]) -> int: + """批量插入消息,返回实际插入条数""" + if not messages: + return 0 + conn = self._get_conn() + sql = INSERT_SQL.format(table=self._table) + rows = [] + for m in messages: + rows.append(( + m["group_username"], + m.get("group_name", ""), + m.get("sender_username", ""), + m.get("sender_name", ""), + m.get("msg_type", "text"), + m.get("content", ""), + m.get("media_url"), + m["local_id"], + m.get("local_type", 0), + m.get("source_db", ""), + m["create_time"], + m["create_time"], + m.get("svr_msg_id"), + m.get("refer_msg_svrid"), + )) + try: + with conn.cursor() as cur: + cur.executemany(sql, rows) + conn.commit() + return cur.rowcount + except Exception: + conn.rollback() + raise + + def get_last_msg_timestamp(self, group_username: str) -> int: + """获取某群最后已入库消息的时间戳""" + conn = self._get_conn() + sql = f"SELECT MAX(msg_timestamp) FROM `{self._table}` WHERE group_username = %s" + with conn.cursor() as cur: + cur.execute(sql, (group_username,)) + row = cur.fetchone() + return row[0] or 0 + + def get_group_stats(self) -> list[dict]: + """获取各群采集统计""" + conn = self._get_conn() + sql = f""" + SELECT group_username, group_name, + COUNT(*) AS total, + MAX(msg_timestamp) AS last_ts, + MIN(msg_timestamp) AS first_ts + FROM `{self._table}` + GROUP BY group_username, group_name + ORDER BY last_ts DESC + """ + with conn.cursor(pymysql.cursors.DictCursor) as cur: + cur.execute(sql) + return cur.fetchall() + + def get_total_count(self) -> int: + conn = self._get_conn() + with conn.cursor() as cur: + cur.execute(f"SELECT COUNT(*) FROM `{self._table}`") + return cur.fetchone()[0] + + def get_pending_media_messages(self, lookback_days: int = 7, limit: int = 500) -> list[dict]: + """查询 media_url 为空的媒体消息(用于回溯补录)""" + conn = self._get_conn() + sql = f""" + SELECT id, group_username, msg_type, content, local_id, + local_type, source_db, msg_timestamp + FROM `{self._table}` + WHERE media_url IS NULL + AND msg_type IN ('image', 'voice', 'video', 'file') + AND msg_time > DATE_SUB(NOW(), INTERVAL %s DAY) + ORDER BY msg_timestamp DESC + LIMIT %s + """ + with conn.cursor(pymysql.cursors.DictCursor) as cur: + cur.execute(sql, (lookback_days, limit)) + return cur.fetchall() + + def update_media_url(self, record_id: int, media_url: str) -> bool: + """更新单条记录的 media_url""" + conn = self._get_conn() + sql = f"UPDATE `{self._table}` SET media_url = %s WHERE id = %s AND media_url IS NULL" + try: + with conn.cursor() as cur: + cur.execute(sql, (media_url, record_id)) + conn.commit() + return cur.rowcount > 0 + except Exception: + conn.rollback() + raise + + def close(self): + if self._conn and self._conn.open: + self._conn.close() + self._conn = None diff --git a/collector/wechat_adapter.py b/collector/wechat_adapter.py new file mode 100644 index 0000000..5bceeeb --- /dev/null +++ b/collector/wechat_adapter.py @@ -0,0 +1,583 @@ +"""微信数据适配层 — 封装 wechat_cli.core,提供群聊发现和增量消息查询""" + +import hashlib +import logging +import os +import re +import sqlite3 +import xml.etree.ElementTree as ET +from contextlib import closing +from datetime import datetime + +from wechat_cli.core.context import AppContext +from wechat_cli.core.contacts import ( + get_contact_names, + resolve_username, + display_name_for_username, +) +from wechat_cli.core.messages import ( + find_msg_db_keys, + _find_msg_tables_for_user, + _is_safe_msg_table_name, + _build_message_filters, + decompress_content, + _parse_message_content, + _split_msg_type, + format_msg_type, + _load_name2id_maps, +) + +log = logging.getLogger(__name__) + +# 消息 base_type → 简化类型名 +_TYPE_MAP = { + 1: "text", 3: "image", 34: "voice", 42: "contact_card", + 43: "video", 47: "sticker", 48: "location", 49: "link", + 50: "call", 10000: "system", 10002: "revoked", +} + +# 需要上传 COS 的媒体类型(image/voice 只上传已解密的可读文件,跳过 .dat) +MEDIA_TYPES = {"video", "file", "image", "voice"} + + +def _table_has_column(conn, table_name: str, column_name: str) -> bool: + """检查 SQLite 表是否包含指定列""" + try: + cols = conn.execute(f"PRAGMA table_info([{table_name}])").fetchall() + return any(row[1] == column_name for row in cols) + except Exception: + return False + + +def _extract_appmsg_meta(content_xml: str) -> dict | None: + """从 type=49 消息的 XML 中提取 appmsg 元信息(文件名、大小、类型等) + + 返回 dict: {title, des, file_size, file_ext, app_type, + refer_svrid, refer_displayname, refer_content} 或 None + """ + if not content_xml: + return None + try: + root = ET.fromstring(content_xml) if content_xml.lstrip().startswith("<") else None + if root is None: + return None + appmsg = root.find(".//appmsg") + if appmsg is None: + return None + app_type = int((appmsg.findtext("type") or "0").strip()) + title = (appmsg.findtext("title") or "").strip() + des = (appmsg.findtext("des") or "").strip() + # 附件信息 + attach = appmsg.find("appattach") + file_size = 0 + file_ext = "" + if attach is not None: + file_size = int((attach.findtext("totallen") or "0").strip()) + file_ext = (attach.findtext("fileext") or "").strip() + result = { + "app_type": app_type, + "title": title, + "des": des, + "file_size": file_size, + "file_ext": file_ext, + } + # 引用消息 (app_type=57): 提取 refermsg 中的 svrid 和被引用内容 + if app_type == 57: + ref = appmsg.find(".//refermsg") + if ref is not None: + svrid_text = (ref.findtext("svrid") or "").strip() + if svrid_text: + try: + result["refer_svrid"] = int(svrid_text) + except ValueError: + pass + result["refer_displayname"] = (ref.findtext("displayname") or "").strip() + result["refer_content"] = (ref.findtext("content") or "").strip() + return result + except Exception: + return None + + +def _format_file_content(meta: dict) -> str: + """将文件元信息格式化为可读的 content 字符串""" + parts = [] + if meta.get("title"): + parts.append(meta["title"]) + size = meta.get("file_size", 0) + if size > 0: + if size >= 1024 * 1024: + parts.append(f"({size / 1024 / 1024:.1f}MB)") + elif size >= 1024: + parts.append(f"({size / 1024:.1f}KB)") + else: + parts.append(f"({size}B)") + return " ".join(parts) + + +def _extract_video_meta(content_xml: str) -> str: + """从视频消息 XML 中提取描述信息""" + if not content_xml: + return "[视频]" + try: + root = ET.fromstring(content_xml) if content_xml.lstrip().startswith("<") else None + if root is None: + return "[视频]" + video = root.find(".//videomsg") + if video is not None: + length = video.get("length", "") + raw_length = video.get("rawlength", "") + dur = length or raw_length + if dur: + return f"[视频] {dur}秒" + return "[视频]" + except Exception: + return "[视频]" + + +def _extract_chat_record(content_xml: str) -> str | None: + """从 type=49, app_type=19 的聊天记录消息中提取纯文本 + + 格式: + [聊天记录] 标题 + 发送者A: 消息内容 + 发送者B: 消息内容 + ... + """ + if not content_xml: + return None + try: + root = ET.fromstring(content_xml) if content_xml.lstrip().startswith("<") else None + if root is None: + return None + appmsg = root.find(".//appmsg") + if appmsg is None: + return None + + title = (appmsg.findtext("title") or "聊天记录").strip() + lines = [f"[聊天记录] {title}"] + + # recorditem 内嵌了一段 XML 字符串 + recorditem_text = appmsg.findtext("recorditem") or "" + if not recorditem_text.strip(): + return lines[0] if lines else None + + rec_root = ET.fromstring(recorditem_text) + for item in rec_root.findall(".//datalist/dataitem"): + sender = (item.findtext("sourcename") or "").strip() + # datatitle 是聊天内容,datadesc 是附加描述 + msg_text = (item.findtext("datatitle") or "").strip() + if not msg_text: + msg_text = (item.findtext("datadesc") or "").strip() + if not msg_text: + # 可能是图片/视频等非文本 + data_type = item.get("datatype", "") + if data_type == "2": + msg_text = "[图片]" + elif data_type == "4": + msg_text = "[视频]" + elif data_type == "6": + msg_text = "[文件]" + else: + msg_text = "[其他]" + if sender: + lines.append(f"{sender}: {msg_text}") + else: + lines.append(msg_text) + + return "\n".join(lines) + except Exception: + return None + + +def _extract_image_md5(content_xml: str) -> str | None: + """从图片消息 XML 中提取 md5 属性(图片内容 hash)""" + if not content_xml: + return None + m = re.search(r'\bmd5="([a-fA-F0-9]{32})"', content_xml) + return m.group(1).lower() if m else None + + +def _file_md5(filepath: str) -> str: + """计算文件内容的 md5""" + h = hashlib.md5() + with open(filepath, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +class WeChatAdapter: + def __init__(self): + self._app = AppContext() + self._names = get_contact_names(self._app.cache, self._app.decrypted_dir) + + @property + def db_dir(self): + return self._app.db_dir + + def refresh_names(self): + """刷新联系人名称缓存(全局单例需要重置)""" + import wechat_cli.core.contacts as _c + _c._contact_names = None + _c._contact_full = None + self._names = get_contact_names(self._app.cache, self._app.decrypted_dir) + + def list_group_sessions(self, limit=500) -> list[dict]: + """列出所有群聊会话""" + path = self._app.cache.get(os.path.join("session", "session.db")) + if not path: + log.error("无法解密 session.db") + return [] + + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + LIMIT ? + """, (limit,)).fetchall() + + groups = [] + for r in rows: + username, unread, summary, ts, msg_type, sender, sender_name = r + if "@chatroom" not in username: + continue + display = self._names.get(username, username) + groups.append({ + "username": username, + "display_name": display, + "last_timestamp": ts, + "unread": unread or 0, + }) + return groups + + def resolve_group_username(self, group_name: str) -> str | None: + return resolve_username(group_name, self._app.cache, self._app.decrypted_dir) + + def display_name(self, username: str) -> str: + return self._names.get(username, username) + + def query_new_messages(self, username: str, after_ts: int, limit: int = 200) -> list[dict]: + """增量查询某群新消息(create_time > after_ts),按时间升序返回结构化数据""" + tables = _find_msg_tables_for_user( + username, self._app.msg_db_keys, self._app.cache + ) + if not tables: + return [] + + group_name = self._names.get(username, username) + is_group = "@chatroom" in username + all_messages = [] + + for table_info in tables: + # 优化:跳过 max_create_time <= after_ts 的表 + if after_ts and table_info["max_create_time"] <= after_ts: + continue + + db_path = table_info["db_path"] + table_name = table_info["table_name"] + + if not _is_safe_msg_table_name(table_name): + continue + + try: + conn = sqlite3.connect(db_path, timeout=5) + try: + id_to_username = _load_name2id_maps(conn) + + has_server_id = _table_has_column(conn, table_name, "server_id") + + # 自定义查询:ORDER BY create_time ASC + clauses, params = _build_message_filters(start_ts=after_ts) + # 改为严格大于(排除已入库的那条) + if clauses: + clauses[0] = "create_time > ?" + where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else "" + extra_col = ", server_id" if has_server_id else "" + sql = f""" + SELECT local_id, local_type, create_time, real_sender_id, + message_content, WCDB_CT_message_content{extra_col} + FROM [{table_name}] + {where_sql} + ORDER BY create_time ASC + LIMIT ? + """ + rows = conn.execute(sql, (*params, limit)).fetchall() + + for row in rows: + msg = self._parse_row( + row, username, group_name, is_group, + id_to_username, db_path, has_server_id + ) + if msg: + all_messages.append(msg) + finally: + conn.close() + except Exception as e: + log.warning("查询 %s 的 %s 失败: %s", username, db_path, e) + + # 跨表合并后按时间排序,截断到 limit + all_messages.sort(key=lambda m: m["create_time"]) + return all_messages[:limit] + + def _parse_row(self, row, username, group_name, is_group, id_to_username, db_path, has_server_id=False): + """解析单条消息原始行为结构化 dict""" + local_id, local_type, create_time, real_sender_id, content_raw, ct = row[:6] + server_id = row[6] if has_server_id and len(row) > 6 else None + + content_raw = decompress_content(content_raw, ct) + if content_raw is None: + content_raw = "" + + # 解析发送者和消息内容 + sender_from_content, text = _parse_message_content(content_raw, local_type, is_group) + + # 解析发送者 + sender_username = id_to_username.get(real_sender_id, "") + if not sender_username and sender_from_content: + sender_username = sender_from_content + sender_name = self._names.get(sender_username, sender_username) + + # 解析消息类型 + base_type, sub_type = _split_msg_type(local_type) + msg_type = _TYPE_MAP.get(base_type, "other") + if base_type == 49 and sub_type == 6: + msg_type = "file" + + # content 处理:文本消息存原文,非文本消息提取元信息 + # 注意:群聊 content_raw 格式为 "wxid:\n...",用 text(剥离发送者后的部分)解析 XML + xml_content = text if text else content_raw + refer_msg_svrid = None + if base_type == 1: + final_content = text + elif base_type == 49: + # appmsg 类型:文件(6)、链接(5)、小程序(33/36)、聊天记录(19)、引用(57) 等 + meta = _extract_appmsg_meta(xml_content) + if meta and meta["app_type"] == 19: + # 聊天记录合并转发 + final_content = _extract_chat_record(xml_content) or meta.get("title", "") + elif meta and meta["app_type"] == 57: + # 引用/回复消息 + refer_msg_svrid = meta.get("refer_svrid") + quote_text = meta.get("title") or "[引用消息]" + ref_name = meta.get("refer_displayname", "") + ref_content = meta.get("refer_content", "") + if len(ref_content) > 160: + ref_content = ref_content[:160] + "..." + if ref_content: + prefix = f"回复 {ref_name}: " if ref_name else "回复: " + quote_text += f"\n ↳ {prefix}{ref_content}" + final_content = quote_text + log.debug("引用消息: refer_svrid=%s, title=%s", refer_msg_svrid, meta.get("title", "")) + elif meta: + if meta["app_type"] == 6: + final_content = _format_file_content(meta) + elif meta.get("title"): + final_content = meta["title"] + if meta.get("des"): + final_content += f" - {meta['des']}" + else: + final_content = "" + log.debug("appmsg 元信息: type=%d, title=%s", meta["app_type"], meta.get("title", "")) + else: + final_content = "" + elif base_type == 43: + final_content = _extract_video_meta(xml_content) + elif base_type == 34: + final_content = "[语音]" + elif base_type == 3: + img_md5 = _extract_image_md5(xml_content) + final_content = f"[图片] {img_md5}" if img_md5 else "[图片]" + elif base_type == 47: + final_content = "[表情]" + elif base_type == 48: + final_content = "[位置]" + elif base_type == 42: + final_content = "[名片]" + else: + final_content = text if text else "" + + # 解析媒体路径 + media_path = None + if msg_type in MEDIA_TYPES and self._app.db_dir: + try: + if msg_type == "image": + media_path = self._resolve_readable_media( + base_type, content_raw, create_time, username + ) + if media_path: + log.debug("图片路径解析成功: %s", media_path) + else: + log.info("图片文件未找到 (local_id=%d, ts=%d), 可能未点开", + local_id, create_time) + elif msg_type == "video": + media_path = self._resolve_video_path(content_raw, create_time) + if media_path: + log.debug("视频路径解析成功: %s", media_path) + else: + log.info("视频文件未找到 (local_id=%d, ts=%d), 可能未下载", + local_id, create_time) + elif msg_type == "file": + media_path = self._resolve_msg_file(final_content, create_time) + if media_path: + log.debug("文件路径解析成功: %s", media_path) + else: + log.info("文件未找到 (local_id=%d, ts=%d), 可能未下载", + local_id, create_time) + elif msg_type == "voice": + media_path = self._resolve_readable_media( + base_type, content_raw, create_time, username + ) + if media_path: + log.debug("语音路径解析成功: %s", media_path) + else: + log.info("语音文件未找到 (local_id=%d, ts=%d)", + local_id, create_time) + except Exception as e: + log.warning("媒体路径解析异常 (local_id=%d, type=%s): %s", + local_id, msg_type, e) + + return { + "group_username": username, + "group_name": group_name, + "local_id": local_id, + "local_type": local_type, + "create_time": create_time, + "sender_username": sender_username, + "sender_name": sender_name, + "msg_type": msg_type, + "content": final_content, + "media_path": media_path, + "source_db": os.path.basename(db_path), + "svr_msg_id": server_id, + "refer_msg_svrid": refer_msg_svrid, + } + + def _resolve_msg_file(self, content: str, create_time: int) -> str | None: + """在 msg/file/YYYY-MM/ 中按文件名查找(文件/视频/音频等都在此目录)""" + wechat_base = os.path.dirname(self._app.db_dir) + dt = datetime.fromtimestamp(create_time) + date_prefix = dt.strftime("%Y-%m") + + file_dir = os.path.join(wechat_base, "msg", "file", date_prefix) + if not os.path.isdir(file_dir): + return None + + # 从 content 提取文件名: "filename.ext (1.2MB)" 或 "[视频] 30秒" 等 + title = (content or "").split(" (")[0].strip() + # 去掉前缀标记如 [视频]、[语音] + for prefix in ("[视频]", "[语音]"): + if title.startswith(prefix): + title = title[len(prefix):].strip() + break + + if not title: + return None + + # 精确匹配 + target = os.path.join(file_dir, title) + if os.path.isfile(target): + return target + + # 模糊匹配 + for f in os.listdir(file_dir): + fp = os.path.join(file_dir, f) + if not os.path.isfile(fp): + continue + if title in f or f in title: + return fp + + return None + + def _resolve_readable_media(self, base_type: int, content: str, create_time: int, chat_username: str) -> str | None: + """解析图片/语音的本地文件路径。 + + 图片: 从 temp/RWTemp/YYYY-MM/ 中按 md5 查找原始图片。 + 语音: 只返回已解密的可读文件。 + """ + wechat_base = os.path.dirname(self._app.db_dir) + + dt = datetime.fromtimestamp(create_time) + date_prefix = dt.strftime("%Y-%m") + + if base_type == 3: + img_md5 = _extract_image_md5(content) + if not img_md5: + return None + rwtemp_dir = os.path.join(wechat_base, "temp", "RWTemp", date_prefix) + if not os.path.isdir(rwtemp_dir): + return None + for f in os.listdir(rwtemp_dir): + fp = os.path.join(rwtemp_dir, f) + ext = os.path.splitext(f)[1].lower() + if ext not in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"): + continue + if not os.path.isfile(fp): + continue + if _file_md5(fp) == img_md5: + return fp + return None + else: + # 语音:只返回已解密的可读文件 + msg_dir = os.path.join(wechat_base, "msg") + attach_dir = os.path.join(msg_dir, "attach") + readable_exts = (".mp3", ".wav", ".amr", ".silk", ".m4a", ".ogg") + + search_hashes = [] + if chat_username: + h = hashlib.md5(chat_username.encode()).hexdigest() + candidate = os.path.join(attach_dir, h) + if os.path.isdir(candidate): + search_hashes.append(h) + if not search_hashes and os.path.isdir(attach_dir): + search_hashes = [ + d for d in os.listdir(attach_dir) + if os.path.isdir(os.path.join(attach_dir, d)) + ] + + for h in search_hashes: + sub = os.path.join(attach_dir, h, date_prefix, "Voice") + if not os.path.isdir(sub): + continue + for f in os.listdir(sub): + fp = os.path.join(sub, f) + if not os.path.isfile(fp): + continue + if f.endswith(".dat"): + continue + if f.lower().endswith(readable_exts): + return fp + + return None + + def _resolve_video_path(self, content: str, create_time: int) -> str | None: + """在 msg/video/YYYY-MM/ 中按 rawmd5 查找 mp4""" + wechat_base = os.path.dirname(self._app.db_dir) + video_dir = os.path.join(wechat_base, "msg", "video") + if not os.path.isdir(video_dir): + return None + + rawmd5 = None + md5_m = re.search(r'rawmd5="([a-f0-9]+)"', content or "") + if md5_m: + rawmd5 = md5_m.group(1) + if not rawmd5: + return None + + dt = datetime.fromtimestamp(create_time) + date_prefix = dt.strftime("%Y-%m") + + month_dir = os.path.join(video_dir, date_prefix) + if not os.path.isdir(month_dir): + return None + + for f in os.listdir(month_dir): + if rawmd5 not in f: + continue + fp = os.path.join(month_dir, f) + if f.endswith(".mp4") and os.path.isfile(fp): + return fp + + return None diff --git a/project.md b/project.md new file mode 100644 index 0000000..af9c1ae --- /dev/null +++ b/project.md @@ -0,0 +1,49 @@ +# wechat-cli 项目规划 + +## 项目目标 +构建基于微信本地数据库的 AI-first CLI 工具生态,支持消息查询、数据导出、自动化采集。 + +## 当前版本 +v0.2.4 + +## 已完成功能 + +### 核心 CLI(wechat_cli/) +- [x] 密钥提取(macOS/Linux/Windows 进程内存扫描) +- [x] 会话列表、消息历史、全局搜索 +- [x] 联系人查询、群成员列表 +- [x] 聊天统计、收藏夹查询 +- [x] 未读消息、增量消息检测 +- [x] 导出聊天记录(Markdown/TXT) +- [x] 媒体文件路径解析(--media 标志) + +### Skills +- [x] export-chat — 单个聊天完整导出(消息+视频+音频+文件) +- [x] tencent-cos-upload — 腾讯云 COS 文件上传工具 + +### 群聊消息采集器(collector/)— 2026-04-13 新增 +- [x] 自动发现所有群聊(定期扫描 session.db) +- [x] 增量消息采集(基于 create_time 水位线) +- [x] MySQL 持久化(wechat_group_message 表) +- [x] 媒体文件上传腾讯 COS(只采集原始可读文件,不做 .dat 解密) + - 图片: temp/RWTemp/YYYY-MM/{md5}.ext(查看后自动生成) + - 文件/视频/音频: msg/file/YYYY-MM/filename +- [x] 自适应扫描频率(hot/warm/cold 三级,优先级队列调度) +- [x] 白名单/黑名单过滤(支持正则) +- [x] 守护模式 + 一次性扫描模式 +- [x] 非文本消息元信息提取(文件名/大小/视频时长等,不依赖本地下载) +- [x] 详细扫描日志(每条消息类型/发送者/媒体状态) +- [x] 聊天记录合并转发解析(app_type=19 → 多行纯文本) +- [x] 引用/回复消息关联(app_type=57 → svr_msg_id + refer_msg_svrid,支持消息间关联查询) +- [x] 扫描频率优化(hot=15s, warm=30s, cold≤120s,保证 ≤2min 入库) +- [x] 默认 DEBUG 日志级别 +- [x] 媒体回溯补录(watchdog 监听 RWTemp + msg/file + 10min 定时兜底扫描,补录 7 天内记录) + +## 待规划功能 +- [ ] 采集数据 Web 查询界面 +- [ ] 消息内容分析/统计 dashboard + +## 技术约束 +- macOS 需要 Full Disk Access 权限 +- 微信版本 ≤ 4.1.8.100 +- 图片原文件需用户在微信中查看后才会出现在 temp/RWTemp 目录