wechat_msg_crawler/collector/backfill.py

400 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""媒体回溯补录模块 — watchdog 实时监听 + 定时兜底扫描
当用户点开微信中的图片/视频/文件后,本地会生成可读文件。
本模块检测这些新文件,上传到 COS并回填 media_url 到数据库中之前为 NULL 的记录。
图片: temp/RWTemp/YYYY-MM/{md5}.ext微信下载/查看后自动生成)
文件/视频/音频: msg/file/YYYY-MM/filename微信接收后自动生成
"""
import hashlib
import logging
import os
import re
import threading
import time
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent
from .config import CollectorConfig
from .storage import MessageStorage
log = logging.getLogger(__name__)
IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"}
VIDEO_EXTS = {".mp4", ".mov", ".avi"}
FILE_EXTS_SKIP = {".tmp", ".downloading"}
def _is_readable_media(filepath: str) -> str | None:
"""判断文件是否为可读媒体文件,返回 'image'/'video'/'file' 或 None"""
ext = os.path.splitext(filepath)[1].lower()
if ext in FILE_EXTS_SKIP:
return None
basename = os.path.basename(filepath)
if "_thumb" in basename:
return None
if ext in IMAGE_EXTS:
return "image"
if ext in VIDEO_EXTS:
return "video"
if ext and ext not in FILE_EXTS_SKIP:
return "file"
return None
def _is_in_rwtemp(filepath: str) -> bool:
"""判断文件是否在 RWTemp 目录内"""
return "/temp/RWTemp/" in filepath
def _file_md5(filepath: str) -> str:
"""计算文件内容的 md5"""
h = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def _extract_md5_from_content(content: str) -> str | None:
"""从 DB content 字段提取图片 md5: '[图片] abc123...'"""
if not content:
return None
m = re.search(r'\[图片\]\s+([a-f0-9]{32})', content)
return m.group(1) if m else None
class MediaFileHandler(FileSystemEventHandler):
"""watchdog 事件处理器 — 新文件出现时触发回溯匹配"""
def __init__(self, backfiller: "MediaBackfiller"):
super().__init__()
self._backfiller = backfiller
def on_created(self, event):
if event.is_directory:
return
if not isinstance(event, FileCreatedEvent):
return
threading.Timer(2.0, self._handle_new_file, args=[event.src_path]).start()
def _handle_new_file(self, filepath: str):
try:
if not os.path.isfile(filepath):
return
if os.path.getsize(filepath) < 100:
return
media_type = _is_readable_media(filepath)
if not media_type:
return
# RWTemp 中的都是图片
if _is_in_rwtemp(filepath):
media_type = "image"
log.info("[watchdog] 检测到新媒体文件: %s (type=%s)", filepath, media_type)
self._backfiller.try_backfill_file(filepath, media_type)
except Exception as e:
log.warning("[watchdog] 处理文件异常 %s: %s", filepath, e)
class MediaBackfiller:
"""媒体回溯补录器
两层保障:
1. watchdog 实时监听 RWTemp(图片) 和 msg/file(文件),新文件立即匹配上传
2. 定时扫描 DB 中 media_url=NULL 的记录,重新尝试路径解析
"""
def __init__(self, storage: MessageStorage, config: CollectorConfig,
cos_uploader=None, wechat_base_dir: str = ""):
self._storage = storage
self._cfg = config
self._cos = cos_uploader
self._wechat_base = wechat_base_dir
self._observer: Observer | None = None
self._scan_thread: threading.Thread | None = None
self._running = False
self._upload_lock = threading.Lock()
def start(self):
"""启动 watchdog 监听 + 定时扫描线程"""
if self._running:
return
self._running = True
watch_dirs = self._get_watch_dirs()
if watch_dirs:
self._observer = Observer()
handler = MediaFileHandler(self)
for d in watch_dirs:
self._observer.schedule(handler, d, recursive=True)
log.info("[backfill] 监听目录: %s", d)
self._observer.start()
log.info("[backfill] watchdog 已启动, 监听 %d 个目录", len(watch_dirs))
else:
log.warning("[backfill] 未找到可监听的微信媒体目录")
self._scan_thread = threading.Thread(
target=self._periodic_scan_loop, daemon=True, name="backfill-scan"
)
self._scan_thread.start()
log.info("[backfill] 定时扫描已启动, 间隔 %ds", int(self._cfg.backfill_interval))
def stop(self):
"""停止所有后台线程"""
self._running = False
if self._observer:
self._observer.stop()
self._observer.join(timeout=5)
self._observer = None
if self._scan_thread:
self._scan_thread.join(timeout=10)
self._scan_thread = None
log.info("[backfill] 已停止")
def set_cos_uploader(self, cos):
"""延迟设置 COS 上传器"""
self._cos = cos
def _get_watch_dirs(self) -> list[str]:
"""获取需要监听的目录: temp/RWTemp(图片) + msg/file(文件) + msg/video(视频)"""
if not self._wechat_base:
return []
dirs = []
rwtemp = os.path.join(self._wechat_base, "temp", "RWTemp")
if os.path.isdir(rwtemp):
dirs.append(rwtemp)
file_dir = os.path.join(self._wechat_base, "msg", "file")
if os.path.isdir(file_dir):
dirs.append(file_dir)
video_dir = os.path.join(self._wechat_base, "msg", "video")
if os.path.isdir(video_dir):
dirs.append(video_dir)
return dirs
def try_backfill_file(self, filepath: str, media_type: str):
"""尝试将新文件匹配到 DB 中的 pending 记录并上传"""
if not self._cos:
log.debug("[backfill] COS 未配置, 跳过上传")
return
with self._upload_lock:
pending = self._storage.get_pending_media_messages(
lookback_days=self._cfg.backfill_lookback_days, limit=500
)
matched = self._match_file_to_record(filepath, media_type, pending)
if matched:
self._do_upload_and_update(filepath, matched)
def _match_file_to_record(self, filepath: str, media_type: str,
records: list[dict]) -> dict | None:
"""将文件路径匹配到数据库记录"""
if media_type == "image":
return self._match_image(filepath, records)
elif media_type == "video":
return self._match_video(filepath, records)
else:
return self._match_file(filepath, records)
def _match_image(self, filepath: str, records: list[dict]) -> dict | None:
"""图片匹配: 计算文件内容 md5匹配 DB content 中的 md5"""
try:
file_hash = _file_md5(filepath)
except Exception:
return None
for rec in records:
if rec["msg_type"] != "image":
continue
rec_md5 = _extract_md5_from_content(rec.get("content", ""))
if rec_md5 and rec_md5 == file_hash:
log.info("[backfill] 图片内容md5匹配: %s → record id=%d", file_hash, rec["id"])
return rec
return None
def _match_file(self, filepath: str, records: list[dict]) -> dict | None:
"""文件匹配: 通过文件名匹配 DB content 中的标题"""
filename = os.path.basename(filepath)
date_prefix = self._extract_date_from_path(filepath)
for rec in records:
if rec["msg_type"] != "file":
continue
if date_prefix:
rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m")
if rec_date != date_prefix:
continue
content = rec.get("content", "") or ""
title = content.split(" (")[0].strip() if content else ""
if title and (title in filename or filename in title):
log.info("[backfill] 文件匹配: %s → record id=%d", filename, rec["id"])
return rec
return None
def _match_video(self, filepath: str, records: list[dict]) -> dict | None:
"""视频匹配: 通过文件名中的 rawmd5 匹配同月的 video 记录"""
filename = os.path.splitext(os.path.basename(filepath))[0].lower()
date_prefix = self._extract_date_from_path(filepath)
for rec in records:
if rec["msg_type"] != "video":
continue
if date_prefix:
rec_date = datetime.fromtimestamp(rec["msg_timestamp"]).strftime("%Y-%m")
if rec_date != date_prefix:
continue
if re.fullmatch(r'[a-f0-9]{32}', filename):
log.info("[backfill] 视频匹配: %s → record id=%d", filename, rec["id"])
return rec
return None
def _do_upload_and_update(self, filepath: str, record: dict):
"""上传文件到 COS 并更新 DB需在锁内调用"""
if not self._cos or not os.path.isfile(filepath):
return
try:
ext = os.path.splitext(filepath)[1]
filename = os.path.basename(filepath)
safe_name = re.sub(r'[^\w.\-]', '_', filename)
dt = datetime.fromtimestamp(record["msg_timestamp"])
date_prefix = dt.strftime("%Y-%m")
cos_key = f"{self._cfg.cos_base_path}/{record['msg_type']}/{date_prefix}/{safe_name}"
url = self._cos.upload(filepath, cos_key)
if url:
updated = self._storage.update_media_url(record["id"], url)
if updated:
log.info("[backfill] 回溯成功: id=%d, type=%s%s",
record["id"], record["msg_type"], url)
else:
log.debug("[backfill] 记录已被更新(可能重复): id=%d", record["id"])
except Exception as e:
log.warning("[backfill] 上传失败: %s%s", filepath, e)
def _periodic_scan_loop(self):
"""定时扫描兜底线程"""
while self._running:
try:
self._do_periodic_scan()
except Exception as e:
log.error("[backfill] 定时扫描异常: %s", e)
wait = self._cfg.backfill_interval
elapsed = 0
while elapsed < wait and self._running:
time.sleep(min(5.0, wait - elapsed))
elapsed += 5.0
def _do_periodic_scan(self):
"""执行一次全量回溯扫描"""
if not self._cos:
return
with self._upload_lock:
pending = self._storage.get_pending_media_messages(
lookback_days=self._cfg.backfill_lookback_days
)
if not pending:
return
log.info("[backfill] 定时扫描: %d 条待补录记录", len(pending))
filled = 0
for rec in pending:
if not self._running:
break
filepath = self._try_resolve_path(rec)
if filepath and os.path.isfile(filepath):
with self._upload_lock:
self._do_upload_and_update(filepath, rec)
filled += 1
if filled > 0:
log.info("[backfill] 定时扫描完成: 成功补录 %d", filled)
def _try_resolve_path(self, record: dict) -> str | None:
"""尝试为一条 pending 记录解析本地文件路径"""
if not self._wechat_base:
return None
msg_type = record["msg_type"]
msg_ts = record["msg_timestamp"]
dt = datetime.fromtimestamp(msg_ts)
date_prefix = dt.strftime("%Y-%m")
if msg_type == "image":
return self._resolve_image_path(date_prefix, record)
elif msg_type == "video":
return self._resolve_video_path(date_prefix)
elif msg_type == "file":
return self._resolve_file_path(date_prefix, record)
return None
def _resolve_image_path(self, date_prefix: str, record: dict) -> str | None:
"""在 temp/RWTemp/YYYY-MM/ 中按文件内容 md5 查找图片"""
img_md5 = _extract_md5_from_content(record.get("content", ""))
if not img_md5:
return None
rwtemp_dir = os.path.join(self._wechat_base, "temp", "RWTemp", date_prefix)
if not os.path.isdir(rwtemp_dir):
return None
for f in os.listdir(rwtemp_dir):
fp = os.path.join(rwtemp_dir, f)
ext = os.path.splitext(f)[1].lower()
if ext not in (".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp"):
continue
if not os.path.isfile(fp):
continue
try:
if _file_md5(fp) == img_md5:
return fp
except Exception:
continue
return None
def _resolve_video_path(self, date_prefix: str) -> str | None:
"""在 msg/video/YYYY-MM/ 中查找 mp4"""
video_dir = os.path.join(self._wechat_base, "msg", "video", date_prefix)
if not os.path.isdir(video_dir):
return None
for f in os.listdir(video_dir):
if f.endswith(".mp4") and os.path.isfile(os.path.join(video_dir, f)):
return os.path.join(video_dir, f)
return None
def _resolve_file_path(self, date_prefix: str, record: dict) -> str | None:
"""在 msg/file/YYYY-MM/ 中按文件名查找"""
content = record.get("content", "") or ""
title = content.split(" (")[0].strip() if content else ""
if not title:
return None
file_dir = os.path.join(self._wechat_base, "msg", "file", date_prefix)
if not os.path.isdir(file_dir):
return None
target = os.path.join(file_dir, title)
if os.path.isfile(target):
return target
for f in os.listdir(file_dir):
if title in f or f in title:
fp = os.path.join(file_dir, f)
if os.path.isfile(fp):
return fp
return None
@staticmethod
def _extract_date_from_path(filepath: str) -> str | None:
m = re.search(r'(\d{4}-\d{2})', filepath)
return m.group(1) if m else None