"""媒体回溯补录模块 — watchdog 实时监听 + 定时兜底扫描 当用户点开微信中的图片/视频/文件后,本地会生成可读文件。 本模块检测这些新文件,上传到 COS,并回填 media_url 到数据库中之前为 NULL 的记录。 图片: temp/RWTemp/YYYY-MM/{md5}.ext(微信下载/查看后自动生成) 文件/视频/音频: msg/file/YYYY-MM/filename(微信接收后自动生成) """ import hashlib import logging import os import re import threading import time from datetime import datetime from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler, FileCreatedEvent from .config import CollectorConfig from .storage import MessageStorage log = logging.getLogger(__name__) IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"} VIDEO_EXTS = {".mp4", ".mov", ".avi"} FILE_EXTS_SKIP = {".tmp", ".downloading"} def _is_readable_media(filepath: str) -> str | None: """判断文件是否为可读媒体文件,返回 'image'/'video'/'file' 或 None""" ext = os.path.splitext(filepath)[1].lower() if ext in FILE_EXTS_SKIP: return None basename = os.path.basename(filepath) if "_thumb" in basename: return None if ext in IMAGE_EXTS: return "image" if ext in VIDEO_EXTS: return "video" if ext and ext not in FILE_EXTS_SKIP: return "file" return None def _is_in_rwtemp(filepath: str) -> bool: """判断文件是否在 RWTemp 目录内""" return "/temp/RWTemp/" in filepath def _file_md5(filepath: str) -> str: """计算文件内容的 md5""" h = hashlib.md5() with open(filepath, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() def _extract_md5_from_content(content: str) -> str | None: """从 DB content 字段提取图片 md5: '[图片] abc123...'""" if not content: return None m = re.search(r'\[图片\]\s+([a-f0-9]{32})', content) return m.group(1) if m else None def _extract_size_from_content(content: str) -> int | None: """从 DB content 字段提取图片文件大小: '[图片] {md5} size:110348'""" if not content: return None m = re.search(r'size:(\d+)', content) return int(m.group(1)) if m else None class MediaFileHandler(FileSystemEventHandler): """watchdog 事件处理器 — 新文件出现时触发回溯匹配""" def __init__(self, backfiller: "MediaBackfiller"): super().__init__() self._backfiller = backfiller def on_created(self, event): if event.is_directory: return if not isinstance(event, FileCreatedEvent): return threading.Timer(2.0, self._handle_new_file, args=[event.src_path]).start() def _handle_new_file(self, filepath: str): try: if not os.path.isfile(filepath): return if os.path.getsize(filepath) < 100: return media_type = _is_readable_media(filepath) if not media_type: return # RWTemp 中的都是图片 if _is_in_rwtemp(filepath): media_type = "image" log.info("[watchdog] 检测到新媒体文件: %s (type=%s)", filepath, media_type) matched = self._backfiller.try_backfill_file(filepath, media_type) if not matched: log.info("[watchdog] 未匹配到DB记录, 30s后重试: %s", os.path.basename(filepath)) threading.Timer(30.0, self._retry_backfill, args=[filepath, media_type]).start() except Exception as e: log.warning("[watchdog] 处理文件异常 %s: %s", filepath, e) def _retry_backfill(self, filepath: str, media_type: str): try: if not os.path.isfile(filepath): return matched = self._backfiller.try_backfill_file(filepath, media_type) if matched: log.info("[watchdog] 重试匹配成功: %s", os.path.basename(filepath)) else: log.info("[watchdog] 重试仍未匹配: %s, 等待定时扫描兜底", os.path.basename(filepath)) except Exception as e: log.warning("[watchdog] 重试异常 %s: %s", filepath, e) class MediaBackfiller: """媒体回溯补录器 两层保障: 1. watchdog 实时监听 RWTemp(图片) 和 msg/file(文件),新文件立即匹配上传 2. 定时扫描 DB 中 media_url=NULL 的记录,重新尝试路径解析 """ def __init__(self, storage: MessageStorage, config: CollectorConfig, cos_uploader=None, wechat_base_dir: str = ""): self._storage = storage self._cfg = config self._cos = cos_uploader self._wechat_base = wechat_base_dir self._observer: PollingObserver | None = None self._scan_thread: threading.Thread | None = None self._running = False self._upload_lock = threading.Lock() def start(self): """启动 watchdog 监听 + 定时扫描线程""" if self._running: return self._running = True watch_dirs = self._get_watch_dirs() if watch_dirs: self._observer = PollingObserver(timeout=5) handler = MediaFileHandler(self) for d in watch_dirs: self._observer.schedule(handler, d, recursive=True) log.info("[backfill] 监听目录: %s", d) self._observer.start() log.info("[backfill] PollingObserver 已启动, 监听 %d 个目录, 轮询间隔 5s", len(watch_dirs)) else: log.warning("[backfill] 未找到可监听的微信媒体目录") self._scan_thread = threading.Thread( target=self._periodic_scan_loop, daemon=True, name="backfill-scan" ) self._scan_thread.start() log.info("[backfill] 定时扫描已启动, 间隔 %ds", int(self._cfg.backfill_interval)) def stop(self): """停止所有后台线程""" self._running = False if self._observer: self._observer.stop() self._observer.join(timeout=5) self._observer = None if self._scan_thread: self._scan_thread.join(timeout=10) self._scan_thread = None log.info("[backfill] 已停止") def set_cos_uploader(self, cos): """延迟设置 COS 上传器""" self._cos = cos def _get_watch_dirs(self) -> list[str]: """获取需要监听的目录: temp/RWTemp(图片) + msg/file(文件) + msg/video(视频)""" if not self._wechat_base: return [] dirs = [] rwtemp = os.path.join(self._wechat_base, "temp", "RWTemp") if os.path.isdir(rwtemp): dirs.append(rwtemp) file_dir = os.path.join(self._wechat_base, "msg", "file") if os.path.isdir(file_dir): dirs.append(file_dir) video_dir = os.path.join(self._wechat_base, "msg", "video") if os.path.isdir(video_dir): dirs.append(video_dir) return dirs def try_backfill_file(self, filepath: str, media_type: str) -> bool: """尝试将新文件匹配到 DB 中的 pending 记录并上传,返回是否匹配成功""" if not self._cos: log.debug("[backfill] COS 未配置, 跳过上传") return False with self._upload_lock: pending = self._storage.get_pending_media_messages( lookback_days=self._cfg.backfill_lookback_days, limit=500 ) matched = self._match_file_to_record(filepath, media_type, pending) if matched: self._do_upload_and_update(filepath, matched) return True return False def _match_file_to_record(self, filepath: str, media_type: str, records: list[dict]) -> dict | None: """将文件路径匹配到数据库记录""" if media_type == "image": return self._match_image(filepath, records) elif media_type == "video": return self._match_video(filepath, records) else: return self._match_file(filepath, records) def _match_image(self, filepath: str, records: list[dict]) -> dict | None: """图片匹配: 优先按文件大小匹配,回退到内容 md5""" try: file_size = os.path.getsize(filepath) except Exception: return None image_records = [r for r in records if r["msg_type"] == "image"] # 优先: 按文件大小匹配(新格式 content 含 size:NNN) for rec in image_records: rec_size = _extract_size_from_content(rec.get("content", "")) if rec_size and rec_size == file_size: log.info("[backfill] 图片文件大小匹配: %d bytes → record id=%d", file_size, rec["id"]) return rec # 回退: 按内容 md5 匹配(兼容旧记录) try: file_hash = _file_md5(filepath) except Exception: file_hash = None if file_hash: for rec in image_records: rec_md5 = _extract_md5_from_content(rec.get("content", "")) if rec_md5 and rec_md5 == file_hash: log.info("[backfill] 图片内容md5匹配: %s → record id=%d", file_hash, rec["id"]) return rec log.info("[backfill] 图片未匹配: size=%d, 文件=%s, DB中%d条image待匹配", file_size, os.path.basename(filepath), len(image_records)) return None def _match_file(self, filepath: str, records: list[dict]) -> dict | None: """文件匹配: 通过文件名匹配 DB content 中的标题""" filename = os.path.basename(filepath) date_prefix = self._extract_date_from_path(filepath) for rec in records: if rec["msg_type"] != "file": continue if date_prefix: rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m") if rec_date != date_prefix: continue content = rec.get("content", "") or "" title = content.split(" (")[0].strip() if content else "" if title and (title in filename or filename in title): log.info("[backfill] 文件匹配: %s → record id=%d", filename, rec["id"]) return rec return None def _match_video(self, filepath: str, records: list[dict]) -> dict | None: """视频匹配: 通过文件名中的 rawmd5 匹配同月的 video 记录""" filename = os.path.splitext(os.path.basename(filepath))[0].lower() date_prefix = self._extract_date_from_path(filepath) for rec in records: if rec["msg_type"] != "video": continue if date_prefix: rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m") if rec_date != date_prefix: continue if re.fullmatch(r'[a-f0-9]{32}', filename): log.info("[backfill] 视频匹配: %s → record id=%d", filename, rec["id"]) return rec return None def _do_upload_and_update(self, filepath: str, record: dict): """上传文件到 COS 并更新 DB(需在锁内调用)""" if not self._cos or not os.path.isfile(filepath): return try: ext = os.path.splitext(filepath)[1] filename = os.path.basename(filepath) safe_name = re.sub(r'[^\w.\-]', '_', filename) dt = datetime.fromtimestamp(record["msg_timestamp"]) date_prefix = dt.strftime("%Y-%m") cos_key = f"{self._cfg.cos_base_path}/{record['msg_type']}/{date_prefix}/{safe_name}" url = self._cos.upload(filepath, cos_key) if url: updated = self._storage.update_media_url(record["id"], url) if updated: log.info("[backfill] 回溯成功: id=%d, type=%s → %s", record["id"], record["msg_type"], url) else: log.debug("[backfill] 记录已被更新(可能重复): id=%d", record["id"]) except Exception as e: log.warning("[backfill] 上传失败: %s → %s", filepath, e) def _periodic_scan_loop(self): """定时扫描兜底线程""" while self._running: try: self._do_periodic_scan() except Exception as e: log.error("[backfill] 定时扫描异常: %s", e) wait = self._cfg.backfill_interval elapsed = 0 while elapsed < wait and self._running: time.sleep(min(5.0, wait - elapsed)) elapsed += 5.0 def _do_periodic_scan(self): """执行一次全量回溯扫描""" if not self._cos: return with self._upload_lock: pending = self._storage.get_pending_media_messages( lookback_days=self._cfg.backfill_lookback_days ) if not pending: return log.info("[backfill] 定时扫描: %d 条待补录记录", len(pending)) filled = 0 for rec in pending: if not self._running: break filepath = self._try_resolve_path(rec) if filepath and os.path.isfile(filepath): with self._upload_lock: self._do_upload_and_update(filepath, rec) filled += 1 if filled > 0: log.info("[backfill] 定时扫描完成: 成功补录 %d 条", filled) def _try_resolve_path(self, record: dict) -> str | None: """尝试为一条 pending 记录解析本地文件路径""" if not self._wechat_base: return None msg_type = record["msg_type"] msg_ts = record["msg_timestamp"] dt = datetime.fromtimestamp(msg_ts) date_prefix = dt.strftime("%Y-%m") if msg_type == "image": return self._resolve_image_path(date_prefix, record) elif msg_type == "video": return self._resolve_video_path(date_prefix) elif msg_type == "file": return self._resolve_file_path(date_prefix, record) return None def _resolve_image_path(self, date_prefix: str, record: dict) -> str | None: """在 temp/RWTemp/YYYY-MM/ 中按文件大小或内容 md5 查找图片""" content = record.get("content", "") target_size = _extract_size_from_content(content) img_md5 = _extract_md5_from_content(content) if not target_size and not img_md5: return None rwtemp_dir = os.path.join(self._wechat_base, "temp", "RWTemp", date_prefix) if not os.path.isdir(rwtemp_dir): return None for f in os.listdir(rwtemp_dir): fp = os.path.join(rwtemp_dir, f) ext = os.path.splitext(f)[1].lower() if ext not in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"): continue if not os.path.isfile(fp): continue if target_size and os.path.getsize(fp) == target_size: return fp try: if img_md5 and _file_md5(fp) == img_md5: return fp except Exception: continue return None def _resolve_video_path(self, date_prefix: str) -> str | None: """在 msg/video/YYYY-MM/ 中查找 mp4""" video_dir = os.path.join(self._wechat_base, "msg", "video", date_prefix) if not os.path.isdir(video_dir): return None for f in os.listdir(video_dir): if f.endswith(".mp4") and os.path.isfile(os.path.join(video_dir, f)): return os.path.join(video_dir, f) return None def _resolve_file_path(self, date_prefix: str, record: dict) -> str | None: """在 msg/file/YYYY-MM/ 中按文件名查找""" content = record.get("content", "") or "" title = content.split(" (")[0].strip() if content else "" if not title: return None file_dir = os.path.join(self._wechat_base, "msg", "file", date_prefix) if not os.path.isdir(file_dir): return None target = os.path.join(file_dir, title) if os.path.isfile(target): return target for f in os.listdir(file_dir): if title in f or f in title: fp = os.path.join(file_dir, f) if os.path.isfile(fp): return fp return None @staticmethod def _extract_date_from_path(filepath: str) -> str | None: m = re.search(r'(\d{4}-\d{2})', filepath) return m.group(1) if m else None