From b1f02cceab5c4612ec5bdf8f9e4b94f7584cb2f7 Mon Sep 17 00:00:00 2001 From: astro Date: Thu, 23 Apr 2026 12:12:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9F=BA=E4=BA=8E=E6=96=87=E6=9C=AC=E5=A4=A7?= =?UTF-8?q?=E5=B0=8F=E5=8C=B9=E9=85=8D=E5=9B=BE=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CLAUDE.md | 8 ++-- collector/backfill.py | 87 ++++++++++++++++++++++++++++--------- collector/config.py | 10 ++--- collector/wechat_adapter.py | 32 ++++++++++---- project.md | 4 +- 5 files changed, 100 insertions(+), 41 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 00c3530..7cf5af9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -34,12 +34,12 @@ - 聊天记录合并转发(app_type=19)解析 recorditem XML,格式化为多行纯文本 - 引用/回复消息(app_type=57)提取 refermsg 中的 svrid 建立消息关联,content 格式化为 "正文\n ↳ 回复 发送者: 被引用内容" - 数据库通过 `svr_msg_id`(微信 server_id)和 `refer_msg_svrid`(refermsg/svrid)实现引用消息的关联查询 -- 采集器扫描策略: hot=15s, warm=30s, cold 退避至 120s(backoff 1.2),保证 ≤2min 入库 +- 采集器扫描策略: hot=5s, warm=10s, cold 退避至 60s(backoff 1.2),保证 ≤1min 入库 - 媒体文件只采集原始可读文件,不做 .dat 解密 - 图片: 从 `temp/RWTemp/YYYY-MM/{md5}.ext` 获取(微信查看图片后自动生成) - 文件: 从 `msg/file/YYYY-MM/filename` 获取 - 视频: 从 `msg/video/YYYY-MM/{rawmd5}.mp4` 获取 -- 图片消息 content 格式为 `[图片] {md5}`,md5 是图片原始内容的 hash(来自消息 XML 的 md5 属性) -- **重要**: RWTemp 中的图片文件名是派生 hash,与消息 XML 中的 md5/aeskey 等属性均不对应。匹配图片必须通过 `hashlib.md5(文件内容)` 计算后与 XML md5 属性比对,不能用文件名匹配 -- 回溯补录: watchdog 监听 temp/RWTemp + msg/file + msg/video 目录 + 每 10 分钟定时扫描 media_url=NULL 记录(7天内) +- 图片消息 content 格式为 `[图片] {md5} size:{length}`,md5 来自消息 XML 的 md5 属性,length 为 XML 的 length 属性(= RWTemp 中文件的精确字节数) +- **重要**: XML md5 属性并非文件内容的 md5(可能是加密前或服务端 hash),文件名也是派生 hash。匹配图片的可靠方式是通过 XML length 与文件大小精确比对,md5 仅作为兼容旧记录的回退 +- 回溯补录: PollingObserver(5s轮询)监听 temp/RWTemp + msg/file + msg/video 目录,匹配失败 30s 后自动重试;每 1 分钟定时扫描 media_url=NULL 记录(7天内)兜底 - 默认日志级别 DEBUG,可通过配置文件 `~/.wechat-cli/collect-chats/config.json` 修改 diff --git a/collector/backfill.py b/collector/backfill.py index f318ef3..cdce404 100644 --- a/collector/backfill.py +++ b/collector/backfill.py @@ -15,7 +15,7 @@ import threading import time from datetime import datetime -from watchdog.observers import Observer +from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler, FileCreatedEvent from .config import CollectorConfig @@ -67,6 +67,14 @@ def _extract_md5_from_content(content: str) -> str | None: return m.group(1) if m else None +def _extract_size_from_content(content: str) -> int | None: + """从 DB content 字段提取图片文件大小: '[图片] {md5} size:110348'""" + if not content: + return None + m = re.search(r'size:(\d+)', content) + return int(m.group(1)) if m else None + + class MediaFileHandler(FileSystemEventHandler): """watchdog 事件处理器 — 新文件出现时触发回溯匹配""" @@ -94,10 +102,25 @@ class MediaFileHandler(FileSystemEventHandler): if _is_in_rwtemp(filepath): media_type = "image" log.info("[watchdog] 检测到新媒体文件: %s (type=%s)", filepath, media_type) - self._backfiller.try_backfill_file(filepath, media_type) + matched = self._backfiller.try_backfill_file(filepath, media_type) + if not matched: + log.info("[watchdog] 未匹配到DB记录, 30s后重试: %s", os.path.basename(filepath)) + threading.Timer(30.0, self._retry_backfill, args=[filepath, media_type]).start() except Exception as e: log.warning("[watchdog] 处理文件异常 %s: %s", filepath, e) + def _retry_backfill(self, filepath: str, media_type: str): + try: + if not os.path.isfile(filepath): + return + matched = self._backfiller.try_backfill_file(filepath, media_type) + if matched: + log.info("[watchdog] 重试匹配成功: %s", os.path.basename(filepath)) + else: + log.info("[watchdog] 重试仍未匹配: %s, 等待定时扫描兜底", os.path.basename(filepath)) + except Exception as e: + log.warning("[watchdog] 重试异常 %s: %s", filepath, e) + class MediaBackfiller: """媒体回溯补录器 @@ -113,7 +136,7 @@ class MediaBackfiller: self._cfg = config self._cos = cos_uploader self._wechat_base = wechat_base_dir - self._observer: Observer | None = None + self._observer: PollingObserver | None = None self._scan_thread: threading.Thread | None = None self._running = False self._upload_lock = threading.Lock() @@ -126,13 +149,13 @@ class MediaBackfiller: watch_dirs = self._get_watch_dirs() if watch_dirs: - self._observer = Observer() + self._observer = PollingObserver(timeout=5) handler = MediaFileHandler(self) for d in watch_dirs: self._observer.schedule(handler, d, recursive=True) log.info("[backfill] 监听目录: %s", d) self._observer.start() - log.info("[backfill] watchdog 已启动, 监听 %d 个目录", len(watch_dirs)) + log.info("[backfill] PollingObserver 已启动, 监听 %d 个目录, 轮询间隔 5s", len(watch_dirs)) else: log.warning("[backfill] 未找到可监听的微信媒体目录") @@ -178,11 +201,11 @@ class MediaBackfiller: return dirs - def try_backfill_file(self, filepath: str, media_type: str): - """尝试将新文件匹配到 DB 中的 pending 记录并上传""" + def try_backfill_file(self, filepath: str, media_type: str) -> bool: + """尝试将新文件匹配到 DB 中的 pending 记录并上传,返回是否匹配成功""" if not self._cos: log.debug("[backfill] COS 未配置, 跳过上传") - return + return False with self._upload_lock: pending = self._storage.get_pending_media_messages( @@ -191,6 +214,8 @@ class MediaBackfiller: matched = self._match_file_to_record(filepath, media_type, pending) if matched: self._do_upload_and_update(filepath, matched) + return True + return False def _match_file_to_record(self, filepath: str, media_type: str, records: list[dict]) -> dict | None: @@ -203,18 +228,34 @@ class MediaBackfiller: return self._match_file(filepath, records) def _match_image(self, filepath: str, records: list[dict]) -> dict | None: - """图片匹配: 计算文件内容 md5,匹配 DB content 中的 md5""" + """图片匹配: 优先按文件大小匹配,回退到内容 md5""" + try: + file_size = os.path.getsize(filepath) + except Exception: + return None + image_records = [r for r in records if r["msg_type"] == "image"] + + # 优先: 按文件大小匹配(新格式 content 含 size:NNN) + for rec in image_records: + rec_size = _extract_size_from_content(rec.get("content", "")) + if rec_size and rec_size == file_size: + log.info("[backfill] 图片文件大小匹配: %d bytes → record id=%d", file_size, rec["id"]) + return rec + + # 回退: 按内容 md5 匹配(兼容旧记录) try: file_hash = _file_md5(filepath) except Exception: - return None - for rec in records: - if rec["msg_type"] != "image": - continue - rec_md5 = _extract_md5_from_content(rec.get("content", "")) - if rec_md5 and rec_md5 == file_hash: - log.info("[backfill] 图片内容md5匹配: %s → record id=%d", file_hash, rec["id"]) - return rec + file_hash = None + if file_hash: + for rec in image_records: + rec_md5 = _extract_md5_from_content(rec.get("content", "")) + if rec_md5 and rec_md5 == file_hash: + log.info("[backfill] 图片内容md5匹配: %s → record id=%d", file_hash, rec["id"]) + return rec + + log.info("[backfill] 图片未匹配: size=%d, 文件=%s, DB中%d条image待匹配", + file_size, os.path.basename(filepath), len(image_records)) return None def _match_file(self, filepath: str, records: list[dict]) -> dict | None: @@ -338,9 +379,11 @@ class MediaBackfiller: return None def _resolve_image_path(self, date_prefix: str, record: dict) -> str | None: - """在 temp/RWTemp/YYYY-MM/ 中按文件内容 md5 查找图片""" - img_md5 = _extract_md5_from_content(record.get("content", "")) - if not img_md5: + """在 temp/RWTemp/YYYY-MM/ 中按文件大小或内容 md5 查找图片""" + content = record.get("content", "") + target_size = _extract_size_from_content(content) + img_md5 = _extract_md5_from_content(content) + if not target_size and not img_md5: return None rwtemp_dir = os.path.join(self._wechat_base, "temp", "RWTemp", date_prefix) @@ -354,8 +397,10 @@ class MediaBackfiller: continue if not os.path.isfile(fp): continue + if target_size and os.path.getsize(fp) == target_size: + return fp try: - if _file_md5(fp) == img_md5: + if img_md5 and _file_md5(fp) == img_md5: return fp except Exception: continue diff --git a/collector/config.py b/collector/config.py index 2603b14..5b84d52 100644 --- a/collector/config.py +++ b/collector/config.py @@ -29,14 +29,14 @@ class CollectorConfig: cos_base_path: str = os.environ.get("COS_BASE_PATH", "") # ---- 扫描策略 ---- - min_interval: float = 15.0 # hot 群扫描间隔(秒) - base_interval: float = 30.0 # warm 群扫描间隔 - max_interval: float = 120.0 # cold 群最大间隔(保证 ≤2min 入库) + min_interval: float = 5.0 # hot 群扫描间隔(秒) + base_interval: float = 10.0 # warm 群扫描间隔 + max_interval: float = 60.0 # cold 群最大间隔(保证 ≤1min 入库) backoff_factor: float = 1.2 # cold 退避系数 batch_size: int = 10 # 每轮最多扫描群数 messages_per_scan: int = 200 # 每群每次最多拉取消息数 jitter_max: float = 1.0 # 群间随机延迟上限(秒) - cycle_sleep: float = 5.0 # 轮次间休眠(秒) + cycle_sleep: float = 1.0 # 轮次间休眠(秒) discovery_interval: float = 180.0 # 群聊发现间隔(秒) hot_threshold: int = 300 # 最新消息 < N秒 算 hot warm_threshold: int = 3600 # 最新消息 < N秒 算 warm @@ -46,7 +46,7 @@ class CollectorConfig: blacklist: list = field(default_factory=list) # 跳过的群(支持正则) # ---- 回溯补录 ---- - backfill_interval: float = 600.0 # 定时扫描间隔(秒), 默认 10 分钟 + backfill_interval: float = 60.0 # 定时扫描间隔(秒), 默认 1 分钟 backfill_lookback_days: int = 7 # 回溯天数 backfill_enabled: bool = True # 是否启用回溯补录 diff --git a/collector/wechat_adapter.py b/collector/wechat_adapter.py index 5bceeeb..cd24bad 100644 --- a/collector/wechat_adapter.py +++ b/collector/wechat_adapter.py @@ -189,12 +189,18 @@ def _extract_chat_record(content_xml: str) -> str | None: return None -def _extract_image_md5(content_xml: str) -> str | None: - """从图片消息 XML 中提取 md5 属性(图片内容 hash)""" +def _extract_image_meta(content_xml: str) -> dict | None: + """从图片消息 XML 中提取 md5 和 length 属性""" if not content_xml: return None - m = re.search(r'\bmd5="([a-fA-F0-9]{32})"', content_xml) - return m.group(1).lower() if m else None + md5_m = re.search(r'\bmd5="([a-fA-F0-9]{32})"', content_xml) + len_m = re.search(r'\blength="(\d+)"', content_xml) + if not md5_m: + return None + return { + "md5": md5_m.group(1).lower(), + "length": int(len_m.group(1)) if len_m else 0, + } def _file_md5(filepath: str) -> str: @@ -388,8 +394,13 @@ class WeChatAdapter: elif base_type == 34: final_content = "[语音]" elif base_type == 3: - img_md5 = _extract_image_md5(xml_content) - final_content = f"[图片] {img_md5}" if img_md5 else "[图片]" + img_meta = _extract_image_meta(xml_content) + if img_meta: + final_content = f"[图片] {img_meta['md5']}" + if img_meta["length"]: + final_content += f" size:{img_meta['length']}" + else: + final_content = "[图片]" elif base_type == 47: final_content = "[表情]" elif base_type == 48: @@ -503,12 +514,13 @@ class WeChatAdapter: date_prefix = dt.strftime("%Y-%m") if base_type == 3: - img_md5 = _extract_image_md5(content) - if not img_md5: + img_meta = _extract_image_meta(content) + if not img_meta: return None rwtemp_dir = os.path.join(wechat_base, "temp", "RWTemp", date_prefix) if not os.path.isdir(rwtemp_dir): return None + target_size = img_meta["length"] for f in os.listdir(rwtemp_dir): fp = os.path.join(rwtemp_dir, f) ext = os.path.splitext(f)[1].lower() @@ -516,7 +528,9 @@ class WeChatAdapter: continue if not os.path.isfile(fp): continue - if _file_md5(fp) == img_md5: + if target_size and os.path.getsize(fp) == target_size: + return fp + if _file_md5(fp) == img_meta["md5"]: return fp return None else: diff --git a/project.md b/project.md index af9c1ae..7b33edf 100644 --- a/project.md +++ b/project.md @@ -35,9 +35,9 @@ v0.2.4 - [x] 详细扫描日志(每条消息类型/发送者/媒体状态) - [x] 聊天记录合并转发解析(app_type=19 → 多行纯文本) - [x] 引用/回复消息关联(app_type=57 → svr_msg_id + refer_msg_svrid,支持消息间关联查询) -- [x] 扫描频率优化(hot=15s, warm=30s, cold≤120s,保证 ≤2min 入库) +- [x] 扫描频率优化(hot=5s, warm=10s, cold≤60s,保证 ≤1min 入库) - [x] 默认 DEBUG 日志级别 -- [x] 媒体回溯补录(watchdog 监听 RWTemp + msg/file + 10min 定时兜底扫描,补录 7 天内记录) +- [x] 媒体回溯补录(PollingObserver 5s轮询监听 RWTemp + msg/file + msg/video,匹配失败30s自动重试 + 1min 定时兜底扫描,补录 7 天内记录) ## 待规划功能 - [ ] 采集数据 Web 查询界面