diff --git a/CLAUDE.md b/CLAUDE.md index 7cf5af9..2d8157d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -38,8 +38,12 @@ - 媒体文件只采集原始可读文件,不做 .dat 解密 - 图片: 从 `temp/RWTemp/YYYY-MM/{md5}.ext` 获取(微信查看图片后自动生成) - 文件: 从 `msg/file/YYYY-MM/filename` 获取 - - 视频: 从 `msg/video/YYYY-MM/{rawmd5}.mp4` 获取 + - 视频: 从 `msg/video/YYYY-MM/{rawmd5}.mp4` 获取,或从 `temp/RWTemp/YYYY-MM/{rawmd5}.mp4` 获取(直接发送的视频点击预览后缓存于此) - 图片消息 content 格式为 `[图片] {md5} size:{length}`,md5 来自消息 XML 的 md5 属性,length 为 XML 的 length 属性(= RWTemp 中文件的精确字节数) - **重要**: XML md5 属性并非文件内容的 md5(可能是加密前或服务端 hash),文件名也是派生 hash。匹配图片的可靠方式是通过 XML length 与文件大小精确比对,md5 仅作为兼容旧记录的回退 +- 视频消息 content 格式为 `[视频] {playlength}秒 rawmd5:{rawmd5} size:{rawlength}`,rawmd5 是本地 mp4 文件名,rawlength 是文件字节数。匹配优先 rawmd5(文件名精确匹配),回退 rawlength 与文件大小比对 +- **注意**: 直接发送的视频 rawmd5 和 rawlength 为空/0,此时使用 XML 的 `length` 属性作为 size(即缓存到 RWTemp 中文件的精确字节数),content 为 `[视频] {playlength}秒 size:{length}` +- 语音消息 content 格式为 `[语音] {N}秒 size:{length}`,voicelength 为语音时长(ms→s四舍五入),length 为语音数据字节数。匹配通过文件大小精确比对 +- 文件消息 content 格式为 `{filename} ({human_size}) size:{exact_bytes}`,匹配优先文件名+精确size双重校验,回退纯文件名子串匹配 - 回溯补录: PollingObserver(5s轮询)监听 temp/RWTemp + msg/file + msg/video 目录,匹配失败 30s 后自动重试;每 1 分钟定时扫描 media_url=NULL 记录(7天内)兜底 - 默认日志级别 DEBUG,可通过配置文件 `~/.wechat-cli/collect-chats/config.json` 修改 diff --git a/collector/backfill.py b/collector/backfill.py index cdce404..ddfd1b5 100644 --- a/collector/backfill.py +++ b/collector/backfill.py @@ -75,6 +75,22 @@ def _extract_size_from_content(content: str) -> int | None: return int(m.group(1)) if m else None +def _extract_video_rawmd5(content: str) -> str | None: + """从 DB content 字段提取视频 rawmd5: '[视频] 15秒 rawmd5:abc123...'""" + if not content: + return None + m = re.search(r'rawmd5:([a-f0-9]+)', content) + return m.group(1) if m else None + + +def _extract_video_size(content: str) -> int | None: + """从 DB content 字段提取视频文件大小: '[视频] 15秒 rawmd5:xxx size:12345'""" + if not content: + return None + m = re.search(r'size:(\d+)', content) + return int(m.group(1)) if m else None + + class MediaFileHandler(FileSystemEventHandler): """watchdog 事件处理器 — 新文件出现时触发回溯匹配""" @@ -98,9 +114,11 @@ class MediaFileHandler(FileSystemEventHandler): media_type = _is_readable_media(filepath) if not media_type: return - # RWTemp 中的都是图片 + # RWTemp 中:视频格式保持 video,其余为 image if _is_in_rwtemp(filepath): - media_type = "image" + ext = os.path.splitext(filepath)[1].lower() + if ext not in VIDEO_EXTS: + media_type = "image" log.info("[watchdog] 检测到新媒体文件: %s (type=%s)", filepath, media_type) matched = self._backfiller.try_backfill_file(filepath, media_type) if not matched: @@ -259,10 +277,12 @@ class MediaBackfiller: return None def _match_file(self, filepath: str, records: list[dict]) -> dict | None: - """文件匹配: 通过文件名匹配 DB content 中的标题""" + """文件匹配: 文件名 + size 双重校验,回退纯文件名""" filename = os.path.basename(filepath) date_prefix = self._extract_date_from_path(filepath) + file_size = os.path.getsize(filepath) + name_matched = None for rec in records: if rec["msg_type"] != "file": continue @@ -272,15 +292,24 @@ class MediaBackfiller: continue content = rec.get("content", "") or "" title = content.split(" (")[0].strip() if content else "" - if title and (title in filename or filename in title): - log.info("[backfill] 文件匹配: %s → record id=%d", filename, rec["id"]) + if not title or (title not in filename and filename not in title): + continue + rec_size = _extract_video_size(content) + if rec_size and rec_size == file_size: + log.info("[backfill] 文件匹配(name+size): %s → record id=%d", filename, rec["id"]) return rec - return None + if name_matched is None: + name_matched = rec + + if name_matched: + log.info("[backfill] 文件匹配(name): %s → record id=%d", filename, name_matched["id"]) + return name_matched def _match_video(self, filepath: str, records: list[dict]) -> dict | None: - """视频匹配: 通过文件名中的 rawmd5 匹配同月的 video 记录""" + """视频匹配: 优先 rawmd5 精确匹配,回退 size 匹配(支持 msg/video 和 RWTemp)""" filename = os.path.splitext(os.path.basename(filepath))[0].lower() date_prefix = self._extract_date_from_path(filepath) + file_size = os.path.getsize(filepath) for rec in records: if rec["msg_type"] != "video": @@ -289,9 +318,26 @@ class MediaBackfiller: rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m") if rec_date != date_prefix: continue - if re.fullmatch(r'[a-f0-9]{32}', filename): - log.info("[backfill] 视频匹配: %s → record id=%d", filename, rec["id"]) + content = rec.get("content", "") or "" + rec_rawmd5 = _extract_video_rawmd5(content) + if rec_rawmd5 and rec_rawmd5 in filename: + log.info("[backfill] 视频匹配(rawmd5): %s → record id=%d", filename, rec["id"]) return rec + + for rec in records: + if rec["msg_type"] != "video": + continue + if date_prefix: + rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m") + if rec_date != date_prefix: + continue + content = rec.get("content", "") or "" + rec_size = _extract_video_size(content) + if rec_size and rec_size == file_size: + log.info("[backfill] 视频匹配(size): %s (%d bytes) → record id=%d", + filename, file_size, rec["id"]) + return rec + return None def _do_upload_and_update(self, filepath: str, record: dict): @@ -373,9 +419,11 @@ class MediaBackfiller: if msg_type == "image": return self._resolve_image_path(date_prefix, record) elif msg_type == "video": - return self._resolve_video_path(date_prefix) + return self._resolve_video_path(date_prefix, record) elif msg_type == "file": return self._resolve_file_path(date_prefix, record) + elif msg_type == "voice": + return self._resolve_voice_path(date_prefix, record) return None def _resolve_image_path(self, date_prefix: str, record: dict) -> str | None: @@ -406,35 +454,113 @@ class MediaBackfiller: continue return None - def _resolve_video_path(self, date_prefix: str) -> str | None: - """在 msg/video/YYYY-MM/ 中查找 mp4""" + def _resolve_video_path(self, date_prefix: str, record: dict) -> str | None: + """在 msg/video/YYYY-MM/ 和 temp/RWTemp/YYYY-MM/ 中按 rawmd5 或 size 查找视频""" + content = record.get("content", "") or "" + rawmd5 = _extract_video_rawmd5(content) + target_size = _extract_video_size(content) + + search_dirs = [] video_dir = os.path.join(self._wechat_base, "msg", "video", date_prefix) - if not os.path.isdir(video_dir): + if os.path.isdir(video_dir): + search_dirs.append(video_dir) + rwtemp_dir = os.path.join(self._wechat_base, "temp", "RWTemp", date_prefix) + if os.path.isdir(rwtemp_dir): + search_dirs.append(rwtemp_dir) + + if not search_dirs: return None - for f in os.listdir(video_dir): - if f.endswith(".mp4") and os.path.isfile(os.path.join(video_dir, f)): - return os.path.join(video_dir, f) + + for d in search_dirs: + if rawmd5: + for f in os.listdir(d): + ext = os.path.splitext(f)[1].lower() + if ext not in VIDEO_EXTS: + continue + if rawmd5 in f: + fp = os.path.join(d, f) + if os.path.isfile(fp): + return fp + + for d in search_dirs: + if target_size: + for f in os.listdir(d): + ext = os.path.splitext(f)[1].lower() + if ext not in VIDEO_EXTS: + continue + fp = os.path.join(d, f) + if os.path.isfile(fp) and os.path.getsize(fp) == target_size: + return fp + return None def _resolve_file_path(self, date_prefix: str, record: dict) -> str | None: - """在 msg/file/YYYY-MM/ 中按文件名查找""" + """在 msg/file/YYYY-MM/ 中按文件名+size查找""" content = record.get("content", "") or "" title = content.split(" (")[0].strip() if content else "" if not title: return None + target_size = _extract_video_size(content) + file_dir = os.path.join(self._wechat_base, "msg", "file", date_prefix) if not os.path.isdir(file_dir): return None target = os.path.join(file_dir, title) if os.path.isfile(target): - return target + if target_size and os.path.getsize(target) == target_size: + return target + if not target_size: + return target + name_match = None for f in os.listdir(file_dir): - if title in f or f in title: - fp = os.path.join(file_dir, f) - if os.path.isfile(fp): + if title not in f and f not in title: + continue + fp = os.path.join(file_dir, f) + if not os.path.isfile(fp): + continue + if target_size and os.path.getsize(fp) == target_size: + return fp + if name_match is None: + name_match = fp + return name_match + + def _resolve_voice_path(self, date_prefix: str, record: dict) -> str | None: + """在 msg/attach/{hash}/YYYY-MM/Voice/ 中按 size 查找语音""" + content = record.get("content", "") or "" + target_size = _extract_video_size(content) + + attach_dir = os.path.join(self._wechat_base, "msg", "attach") + if not os.path.isdir(attach_dir): + return None + + readable_exts = (".mp3", ".wav", ".amr", ".silk", ".m4a", ".ogg") + group_username = record.get("group_username", "") + search_hashes = [] + if group_username: + h = hashlib.md5(group_username.encode()).hexdigest() + candidate = os.path.join(attach_dir, h) + if os.path.isdir(candidate): + search_hashes.append(h) + if not search_hashes: + search_hashes = [ + d for d in os.listdir(attach_dir) + if os.path.isdir(os.path.join(attach_dir, d)) + ] + + for h in search_hashes: + voice_dir = os.path.join(attach_dir, h, date_prefix, "Voice") + if not os.path.isdir(voice_dir): + continue + for f in os.listdir(voice_dir): + fp = os.path.join(voice_dir, f) + if not os.path.isfile(fp) or f.endswith(".dat"): + continue + if not f.lower().endswith(readable_exts): + continue + if target_size and os.path.getsize(fp) == target_size: return fp return None diff --git a/collector/wechat_adapter.py b/collector/wechat_adapter.py index cd24bad..82e094a 100644 --- a/collector/wechat_adapter.py +++ b/collector/wechat_adapter.py @@ -111,11 +111,16 @@ def _format_file_content(meta: dict) -> str: parts.append(f"({size / 1024:.1f}KB)") else: parts.append(f"({size}B)") + parts.append(f"size:{size}") return " ".join(parts) def _extract_video_meta(content_xml: str) -> str: - """从视频消息 XML 中提取描述信息""" + """从视频消息 XML 中提取描述信息,包含匹配所需的 rawmd5 和 size + + 直接发送的视频: rawmd5 为空, rawlength=0, 但 length 为实际文件大小 + 文件方式发送的视频: rawmd5 和 rawlength 有值 + """ if not content_xml: return "[视频]" try: @@ -124,16 +129,49 @@ def _extract_video_meta(content_xml: str) -> str: return "[视频]" video = root.find(".//videomsg") if video is not None: - length = video.get("length", "") - raw_length = video.get("rawlength", "") - dur = length or raw_length - if dur: - return f"[视频] {dur}秒" + playlength = video.get("playlength", "") + rawmd5 = video.get("rawmd5", "") + rawlength = video.get("rawlength", "0") + length = video.get("length", "0") + parts = ["[视频]"] + if playlength: + parts.append(f"{playlength}秒") + if rawmd5: + parts.append(f"rawmd5:{rawmd5}") + # 优先使用 rawlength(文件发送),回退到 length(直接发送) + effective_size = rawlength if rawlength and rawlength != "0" else length + if effective_size and effective_size != "0": + parts.append(f"size:{effective_size}") + return " ".join(parts) return "[视频]" except Exception: return "[视频]" +def _extract_voice_meta(content_xml: str) -> str: + """从语音消息 XML 中提取时长和数据大小""" + if not content_xml: + return "[语音]" + try: + root = ET.fromstring(content_xml) if content_xml.lstrip().startswith("<") else None + if root is None: + return "[语音]" + voice = root.find(".//voicemsg") + if voice is not None: + voicelength = voice.get("voicelength", "") + length = voice.get("length", "") + parts = ["[语音]"] + if voicelength: + dur_ms = int(voicelength) + parts.append(f"{(dur_ms + 500) // 1000}秒") + if length: + parts.append(f"size:{length}") + return " ".join(parts) + return "[语音]" + except Exception: + return "[语音]" + + def _extract_chat_record(content_xml: str) -> str | None: """从 type=49, app_type=19 的聊天记录消息中提取纯文本 @@ -392,7 +430,7 @@ class WeChatAdapter: elif base_type == 43: final_content = _extract_video_meta(xml_content) elif base_type == 34: - final_content = "[语音]" + final_content = _extract_voice_meta(xml_content) elif base_type == 3: img_meta = _extract_image_meta(xml_content) if img_meta: @@ -534,7 +572,13 @@ class WeChatAdapter: return fp return None else: - # 语音:只返回已解密的可读文件 + # 语音:按文件大小匹配已解密的可读文件 + voice_meta = _extract_voice_meta(content) + target_size = None + m = re.search(r'size:(\d+)', voice_meta) + if m: + target_size = int(m.group(1)) + msg_dir = os.path.join(wechat_base, "msg") attach_dir = os.path.join(msg_dir, "attach") readable_exts = (".mp3", ".wav", ".amr", ".silk", ".m4a", ".ogg") @@ -551,6 +595,7 @@ class WeChatAdapter: if os.path.isdir(os.path.join(attach_dir, d)) ] + size_match = None for h in search_hashes: sub = os.path.join(attach_dir, h, date_prefix, "Voice") if not os.path.isdir(sub): @@ -561,37 +606,66 @@ class WeChatAdapter: continue if f.endswith(".dat"): continue - if f.lower().endswith(readable_exts): + if not f.lower().endswith(readable_exts): + continue + if target_size and os.path.getsize(fp) == target_size: return fp + if size_match is None: + size_match = fp - return None + return size_match def _resolve_video_path(self, content: str, create_time: int) -> str | None: - """在 msg/video/YYYY-MM/ 中按 rawmd5 查找 mp4""" + """在 msg/video/YYYY-MM/ 和 temp/RWTemp/YYYY-MM/ 中按 rawmd5 或 size 查找视频""" wechat_base = os.path.dirname(self._app.db_dir) - video_dir = os.path.join(wechat_base, "msg", "video") - if not os.path.isdir(video_dir): - return None rawmd5 = None md5_m = re.search(r'rawmd5="([a-f0-9]+)"', content or "") if md5_m: rawmd5 = md5_m.group(1) - if not rawmd5: + + rawlength = None + len_m = re.search(r'rawlength="(\d+)"', content or "") + if len_m: + rawlength = int(len_m.group(1)) + + if not rawmd5 and not rawlength: return None dt = datetime.fromtimestamp(create_time) date_prefix = dt.strftime("%Y-%m") - month_dir = os.path.join(video_dir, date_prefix) - if not os.path.isdir(month_dir): + video_exts = (".mp4", ".mov", ".avi") + search_dirs = [] + video_dir = os.path.join(wechat_base, "msg", "video", date_prefix) + if os.path.isdir(video_dir): + search_dirs.append(video_dir) + rwtemp_dir = os.path.join(wechat_base, "temp", "RWTemp", date_prefix) + if os.path.isdir(rwtemp_dir): + search_dirs.append(rwtemp_dir) + + if not search_dirs: return None - for f in os.listdir(month_dir): - if rawmd5 not in f: - continue - fp = os.path.join(month_dir, f) - if f.endswith(".mp4") and os.path.isfile(fp): - return fp + for d in search_dirs: + if rawmd5: + for f in os.listdir(d): + ext = os.path.splitext(f)[1].lower() + if ext not in video_exts: + continue + if rawmd5 in f: + fp = os.path.join(d, f) + if os.path.isfile(fp): + return fp + + for d in search_dirs: + if rawlength: + for f in os.listdir(d): + ext = os.path.splitext(f)[1].lower() + if ext not in video_exts: + continue + fp = os.path.join(d, f) + if os.path.isfile(fp) and os.path.getsize(fp) == rawlength: + return fp return None diff --git a/project.md b/project.md index 7b33edf..6e5e646 100644 --- a/project.md +++ b/project.md @@ -26,8 +26,10 @@ v0.2.4 - [x] 增量消息采集(基于 create_time 水位线) - [x] MySQL 持久化(wechat_group_message 表) - [x] 媒体文件上传腾讯 COS(只采集原始可读文件,不做 .dat 解密) - - 图片: temp/RWTemp/YYYY-MM/{md5}.ext(查看后自动生成) - - 文件/视频/音频: msg/file/YYYY-MM/filename + - 图片: temp/RWTemp/YYYY-MM/{md5}.ext(查看后自动生成),通过 size/md5 精确匹配 + - 视频: msg/video/YYYY-MM/{rawmd5}.mp4(文件方式发送)或 temp/RWTemp/YYYY-MM/{rawmd5}.mp4(直接发送后预览缓存),通过 rawmd5/size 精确匹配 + - 语音: msg/attach/{hash}/YYYY-MM/Voice/(自动生成),通过 size 精确匹配 + - 文件: msg/file/YYYY-MM/filename,通过文件名+精确size双重校验 - [x] 自适应扫描频率(hot/warm/cold 三级,优先级队列调度) - [x] 白名单/黑名单过滤(支持正则) - [x] 守护模式 + 一次性扫描模式