250 lines
7.4 KiB
Python
250 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
微信群聊消息自动采集器
|
|
|
|
用法:
|
|
python3 collect_chats.py scan # 一次性扫描所有群聊
|
|
python3 collect_chats.py daemon # 持续运行(自适应频率)
|
|
python3 collect_chats.py status # 查看采集统计
|
|
python3 collect_chats.py groups # 列出所有群聊
|
|
python3 collect_chats.py enable "群名" # 启用某群
|
|
python3 collect_chats.py disable "群名" # 停止某群
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from collector.config import CollectorConfig
|
|
from collector.storage import MessageStorage
|
|
from collector.wechat_adapter import WeChatAdapter
|
|
from collector.scanner import GroupScanner, _init_cos_uploader
|
|
from collector.backfill import MediaBackfiller
|
|
|
|
log = logging.getLogger("collector")
|
|
|
|
|
|
def setup_logging(level="INFO"):
|
|
fmt = "%(asctime)s [%(levelname)s] %(message)s"
|
|
logging.basicConfig(level=getattr(logging, level, logging.INFO),
|
|
format=fmt, datefmt="%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def cmd_scan(args):
|
|
"""一次性扫描"""
|
|
config = CollectorConfig.load(args.config)
|
|
setup_logging(config.log_level)
|
|
|
|
log.info("初始化微信数据适配器...")
|
|
adapter = WeChatAdapter()
|
|
|
|
log.info("连接 MySQL...")
|
|
storage = MessageStorage(config)
|
|
storage.ensure_table()
|
|
|
|
scanner = GroupScanner(adapter, storage, config)
|
|
|
|
log.info("发现群聊...")
|
|
scanner.discover_groups()
|
|
|
|
log.info("开始扫描...")
|
|
total = 0
|
|
# 循环扫描直到所有群都扫完一遍
|
|
rounds = 0
|
|
while True:
|
|
n = scanner.scan_next_batch()
|
|
total += n
|
|
rounds += 1
|
|
# 如果没有到期的群了,说明本轮扫完
|
|
if scanner.time_to_next() > 0:
|
|
break
|
|
|
|
log.info("扫描完成: 共 %d 轮, 新增 %d 条消息", rounds, total)
|
|
|
|
# 打印统计
|
|
status = scanner.get_status()
|
|
print(json.dumps(status, ensure_ascii=False, indent=2))
|
|
storage.close()
|
|
|
|
|
|
def cmd_daemon(args):
|
|
"""守护模式 — 持续运行"""
|
|
config = CollectorConfig.load(args.config)
|
|
setup_logging(config.log_level)
|
|
|
|
running = True
|
|
|
|
def on_signal(signum, _):
|
|
nonlocal running
|
|
log.info("收到信号 %d, 准备退出...", signum)
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, on_signal)
|
|
signal.signal(signal.SIGTERM, on_signal)
|
|
|
|
log.info("=" * 50)
|
|
log.info("微信群聊采集器 - 守护模式启动")
|
|
log.info("扫描策略: hot=%ds, warm=%ds, cold 退避至 %ds",
|
|
int(config.min_interval), int(config.base_interval), int(config.max_interval))
|
|
log.info("批次=%d, 每群最多=%d条/次", config.batch_size, config.messages_per_scan)
|
|
log.info("=" * 50)
|
|
|
|
adapter = WeChatAdapter()
|
|
storage = MessageStorage(config)
|
|
storage.ensure_table()
|
|
scanner = GroupScanner(adapter, storage, config)
|
|
|
|
# 启动回溯补录器(使用独立的 MySQL 连接,避免多线程冲突)
|
|
backfiller = None
|
|
if config.backfill_enabled:
|
|
wechat_base = os.path.dirname(adapter.db_dir) if adapter.db_dir else ""
|
|
backfill_storage = MessageStorage(config)
|
|
backfill_storage.ensure_table()
|
|
backfiller = MediaBackfiller(
|
|
storage=backfill_storage, config=config,
|
|
wechat_base_dir=wechat_base
|
|
)
|
|
try:
|
|
cos = _init_cos_uploader(config)
|
|
backfiller.set_cos_uploader(cos)
|
|
except Exception as e:
|
|
log.warning("COS 初始化失败,回溯补录上传功能不可用: %s", e)
|
|
backfiller.start()
|
|
|
|
# 初始发现
|
|
scanner.discover_groups()
|
|
|
|
cycle = 0
|
|
while running:
|
|
cycle += 1
|
|
|
|
# 定期重新发现群聊
|
|
if scanner.should_discover():
|
|
try:
|
|
adapter.refresh_names()
|
|
scanner.discover_groups()
|
|
except Exception as e:
|
|
log.error("群聊发现失败: %s", e)
|
|
|
|
# 扫描批次
|
|
try:
|
|
n = scanner.scan_next_batch()
|
|
if n > 0:
|
|
log.info("--- 第 %d 轮: 新增 %d 条消息 ---", cycle, n)
|
|
except Exception as e:
|
|
log.error("扫描异常: %s", e)
|
|
|
|
# 智能休眠
|
|
sleep_time = scanner.time_to_next()
|
|
if sleep_time > 0 and running:
|
|
time.sleep(sleep_time)
|
|
|
|
log.info("采集器已停止")
|
|
if backfiller:
|
|
backfiller.stop()
|
|
storage.close()
|
|
|
|
|
|
def cmd_status(args):
|
|
"""查看采集统计"""
|
|
config = CollectorConfig.load(args.config)
|
|
storage = MessageStorage(config)
|
|
storage.ensure_table()
|
|
|
|
total = storage.get_total_count()
|
|
groups = storage.get_group_stats()
|
|
|
|
print(f"\n消息总数: {total}")
|
|
print(f"群聊数量: {len(groups)}")
|
|
print("-" * 60)
|
|
|
|
for g in groups:
|
|
last_dt = datetime.fromtimestamp(g["last_ts"]).strftime("%m-%d %H:%M") if g["last_ts"] else "无"
|
|
print(f" {g['group_name']:20s} {g['total']:>6d} 条 最新: {last_dt}")
|
|
|
|
storage.close()
|
|
|
|
|
|
def cmd_groups(args):
|
|
"""列出所有群聊"""
|
|
config = CollectorConfig.load(args.config)
|
|
setup_logging(config.log_level)
|
|
|
|
adapter = WeChatAdapter()
|
|
sessions = adapter.list_group_sessions(limit=500)
|
|
|
|
print(f"\n共发现 {len(sessions)} 个群聊:")
|
|
print("-" * 60)
|
|
for s in sessions:
|
|
ts = datetime.fromtimestamp(s["last_timestamp"]).strftime("%m-%d %H:%M")
|
|
unread_tag = f" ({s['unread']}条未读)" if s["unread"] else ""
|
|
print(f" [{ts}] {s['display_name']}{unread_tag}")
|
|
print(f" {s['username']}")
|
|
|
|
|
|
def cmd_enable(args):
|
|
"""启用/停用群聊(通过修改配置黑名单)"""
|
|
config = CollectorConfig.load(args.config)
|
|
group_name = args.group_name
|
|
if group_name in config.blacklist:
|
|
config.blacklist.remove(group_name)
|
|
config.save(args.config)
|
|
print(f"已从黑名单移除: {group_name}")
|
|
else:
|
|
print(f"{group_name} 不在黑名单中")
|
|
|
|
|
|
def cmd_disable(args):
|
|
config = CollectorConfig.load(args.config)
|
|
group_name = args.group_name
|
|
if group_name not in config.blacklist:
|
|
config.blacklist.append(group_name)
|
|
config.save(args.config)
|
|
print(f"已加入黑名单: {group_name}")
|
|
else:
|
|
print(f"{group_name} 已在黑名单中")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="微信群聊消息自动采集器",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument("--config", default=None, help="配置文件路径")
|
|
sub = parser.add_subparsers(dest="command")
|
|
|
|
sub.add_parser("scan", help="一次性扫描所有群聊")
|
|
sub.add_parser("daemon", help="启动守护模式持续采集")
|
|
sub.add_parser("status", help="查看采集统计")
|
|
sub.add_parser("groups", help="列出所有群聊")
|
|
|
|
p_enable = sub.add_parser("enable", help="启用某群采集")
|
|
p_enable.add_argument("group_name", help="群聊名称")
|
|
|
|
p_disable = sub.add_parser("disable", help="停止某群采集")
|
|
p_disable.add_argument("group_name", help="群聊名称")
|
|
|
|
args = parser.parse_args()
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
commands = {
|
|
"scan": cmd_scan,
|
|
"daemon": cmd_daemon,
|
|
"status": cmd_status,
|
|
"groups": cmd_groups,
|
|
"enable": cmd_enable,
|
|
"disable": cmd_disable,
|
|
}
|
|
commands[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|