wechat_msg_crawler/collect_chats.py

250 lines
7.4 KiB
Python

#!/usr/bin/env python3
"""
微信群聊消息自动采集器
用法:
python3 collect_chats.py scan # 一次性扫描所有群聊
python3 collect_chats.py daemon # 持续运行(自适应频率)
python3 collect_chats.py status # 查看采集统计
python3 collect_chats.py groups # 列出所有群聊
python3 collect_chats.py enable "群名" # 启用某群
python3 collect_chats.py disable "群名" # 停止某群
"""
import argparse
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime
from collector.config import CollectorConfig
from collector.storage import MessageStorage
from collector.wechat_adapter import WeChatAdapter
from collector.scanner import GroupScanner, _init_cos_uploader
from collector.backfill import MediaBackfiller
log = logging.getLogger("collector")
def setup_logging(level="INFO"):
fmt = "%(asctime)s [%(levelname)s] %(message)s"
logging.basicConfig(level=getattr(logging, level, logging.INFO),
format=fmt, datefmt="%Y-%m-%d %H:%M:%S")
def cmd_scan(args):
"""一次性扫描"""
config = CollectorConfig.load(args.config)
setup_logging(config.log_level)
log.info("初始化微信数据适配器...")
adapter = WeChatAdapter()
log.info("连接 MySQL...")
storage = MessageStorage(config)
storage.ensure_table()
scanner = GroupScanner(adapter, storage, config)
log.info("发现群聊...")
scanner.discover_groups()
log.info("开始扫描...")
total = 0
# 循环扫描直到所有群都扫完一遍
rounds = 0
while True:
n = scanner.scan_next_batch()
total += n
rounds += 1
# 如果没有到期的群了,说明本轮扫完
if scanner.time_to_next() > 0:
break
log.info("扫描完成: 共 %d 轮, 新增 %d 条消息", rounds, total)
# 打印统计
status = scanner.get_status()
print(json.dumps(status, ensure_ascii=False, indent=2))
storage.close()
def cmd_daemon(args):
"""守护模式 — 持续运行"""
config = CollectorConfig.load(args.config)
setup_logging(config.log_level)
running = True
def on_signal(signum, _):
nonlocal running
log.info("收到信号 %d, 准备退出...", signum)
running = False
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
log.info("=" * 50)
log.info("微信群聊采集器 - 守护模式启动")
log.info("扫描策略: hot=%ds, warm=%ds, cold 退避至 %ds",
int(config.min_interval), int(config.base_interval), int(config.max_interval))
log.info("批次=%d, 每群最多=%d条/次", config.batch_size, config.messages_per_scan)
log.info("=" * 50)
adapter = WeChatAdapter()
storage = MessageStorage(config)
storage.ensure_table()
scanner = GroupScanner(adapter, storage, config)
# 启动回溯补录器(使用独立的 MySQL 连接,避免多线程冲突)
backfiller = None
if config.backfill_enabled:
wechat_base = os.path.dirname(adapter.db_dir) if adapter.db_dir else ""
backfill_storage = MessageStorage(config)
backfill_storage.ensure_table()
backfiller = MediaBackfiller(
storage=backfill_storage, config=config,
wechat_base_dir=wechat_base
)
try:
cos = _init_cos_uploader(config)
backfiller.set_cos_uploader(cos)
except Exception as e:
log.warning("COS 初始化失败,回溯补录上传功能不可用: %s", e)
backfiller.start()
# 初始发现
scanner.discover_groups()
cycle = 0
while running:
cycle += 1
# 定期重新发现群聊
if scanner.should_discover():
try:
adapter.refresh_names()
scanner.discover_groups()
except Exception as e:
log.error("群聊发现失败: %s", e)
# 扫描批次
try:
n = scanner.scan_next_batch()
if n > 0:
log.info("--- 第 %d 轮: 新增 %d 条消息 ---", cycle, n)
except Exception as e:
log.error("扫描异常: %s", e)
# 智能休眠
sleep_time = scanner.time_to_next()
if sleep_time > 0 and running:
time.sleep(sleep_time)
log.info("采集器已停止")
if backfiller:
backfiller.stop()
storage.close()
def cmd_status(args):
"""查看采集统计"""
config = CollectorConfig.load(args.config)
storage = MessageStorage(config)
storage.ensure_table()
total = storage.get_total_count()
groups = storage.get_group_stats()
print(f"\n消息总数: {total}")
print(f"群聊数量: {len(groups)}")
print("-" * 60)
for g in groups:
last_dt = datetime.fromtimestamp(g["last_ts"]).strftime("%m-%d %H:%M") if g["last_ts"] else ""
print(f" {g['group_name']:20s} {g['total']:>6d} 条 最新: {last_dt}")
storage.close()
def cmd_groups(args):
"""列出所有群聊"""
config = CollectorConfig.load(args.config)
setup_logging(config.log_level)
adapter = WeChatAdapter()
sessions = adapter.list_group_sessions(limit=500)
print(f"\n共发现 {len(sessions)} 个群聊:")
print("-" * 60)
for s in sessions:
ts = datetime.fromtimestamp(s["last_timestamp"]).strftime("%m-%d %H:%M")
unread_tag = f" ({s['unread']}条未读)" if s["unread"] else ""
print(f" [{ts}] {s['display_name']}{unread_tag}")
print(f" {s['username']}")
def cmd_enable(args):
"""启用/停用群聊(通过修改配置黑名单)"""
config = CollectorConfig.load(args.config)
group_name = args.group_name
if group_name in config.blacklist:
config.blacklist.remove(group_name)
config.save(args.config)
print(f"已从黑名单移除: {group_name}")
else:
print(f"{group_name} 不在黑名单中")
def cmd_disable(args):
config = CollectorConfig.load(args.config)
group_name = args.group_name
if group_name not in config.blacklist:
config.blacklist.append(group_name)
config.save(args.config)
print(f"已加入黑名单: {group_name}")
else:
print(f"{group_name} 已在黑名单中")
def main():
parser = argparse.ArgumentParser(
description="微信群聊消息自动采集器",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--config", default=None, help="配置文件路径")
sub = parser.add_subparsers(dest="command")
sub.add_parser("scan", help="一次性扫描所有群聊")
sub.add_parser("daemon", help="启动守护模式持续采集")
sub.add_parser("status", help="查看采集统计")
sub.add_parser("groups", help="列出所有群聊")
p_enable = sub.add_parser("enable", help="启用某群采集")
p_enable.add_argument("group_name", help="群聊名称")
p_disable = sub.add_parser("disable", help="停止某群采集")
p_disable.add_argument("group_name", help="群聊名称")
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
commands = {
"scan": cmd_scan,
"daemon": cmd_daemon,
"status": cmd_status,
"groups": cmd_groups,
"enable": cmd_enable,
"disable": cmd_disable,
}
commands[args.command](args)
if __name__ == "__main__":
main()