From 86a47822adb3b9feca77331b9ae374193267de05 Mon Sep 17 00:00:00 2001 From: crislee Date: Wed, 22 Apr 2026 19:28:54 +0800 Subject: [PATCH] v0.1 --- .gitignore | 29 +++ CLAUDE.md | 64 ++++++ README.md | 156 +++++++++++++ config.example.yaml | 53 +++++ main.py | 74 ++++++ project.md | 67 ++++++ requirements.txt | 4 + wechat_clicker/__init__.py | 1 + wechat_clicker/automator.py | 334 +++++++++++++++++++++++++++ wechat_clicker/ax_bridge.py | 300 ++++++++++++++++++++++++ wechat_clicker/config.py | 201 ++++++++++++++++ wechat_clicker/human_like.py | 91 ++++++++ wechat_clicker/logger_setup.py | 40 ++++ wechat_clicker/state_machine.py | 201 ++++++++++++++++ wechat_clicker/wechat_ui.py | 394 ++++++++++++++++++++++++++++++++ 15 files changed, 2009 insertions(+) create mode 100644 .gitignore create mode 100644 CLAUDE.md create mode 100644 README.md create mode 100644 config.example.yaml create mode 100644 main.py create mode 100644 project.md create mode 100644 requirements.txt create mode 100644 wechat_clicker/__init__.py create mode 100644 wechat_clicker/automator.py create mode 100644 wechat_clicker/ax_bridge.py create mode 100644 wechat_clicker/config.py create mode 100644 wechat_clicker/human_like.py create mode 100644 wechat_clicker/logger_setup.py create mode 100644 wechat_clicker/state_machine.py create mode 100644 wechat_clicker/wechat_ui.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0bcb105 --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# 用户配置(含个人设置) +config.yaml + +# 日志 +*.log +*.log.* + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ + +# 虚拟环境 +venv/ +.venv/ + +# macOS +.DS_Store + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Claude Code +.claude/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..d411ea4 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,64 @@ +# WeChat 消息自动点击器 + +自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载到本地目录。 + +## 项目背景 + +配合另一个信息收集项目使用:信息收集脚本负责将微信消息入库,但图片和文件需要被点击预览后微信才会下载原始文件到本地。本工具自动完成这个"点击"操作。 + +## 技术架构 + +- **语言**: Python 3.11+ +- **核心技术**: macOS Accessibility API (AXUIElement) via pyobjc +- **目标应用**: 微信桌面端 v4.1.9+ (bundle ID: com.tencent.xinWeChat) +- **运行平台**: macOS Sonoma 14+ + +### 模块结构 + +``` +wechat_clicker/ +├── ax_bridge.py # AXUIElement 底层封装(属性读取、操作执行、键盘事件) +├── wechat_ui.py # 微信 UI 导航(找聊天列表、消息列表、解析 title 分类消息类型) +├── state_machine.py # UI 状态机(基于窗口数量判断状态、状态恢复) +├── automator.py # 主自动化逻辑(扫描→点击→预览→关闭→循环) +├── human_like.py # 拟人行为(高斯分布延迟、长休息、工作时间) +├── config.py # YAML 配置加载 +└── logger_setup.py # 日志配置 +``` + +### 关键设计决策 + +- 微信 v4.1.9 点击聊天会打开**独立窗口**(非页内导航),状态检测基于窗口计数 +- 元素查找使用 role+name 搜索(非硬编码索引),适应 UI 变化 +- 消息类型通过 title 内容判断:`"图片"` → 图片,`"文件\n..."` → 文件 +- 预览通过 Escape 键关闭 + +## 使用方法 + +```bash +# 前置条件 +pip install -r requirements.txt +# 系统设置 > 隐私与安全 > 辅助功能 → 添加终端/Python + +# 复制配置 +cp config.example.yaml config.yaml + +# 运行 +python main.py # 持续运行 +python main.py --once # 单次扫描 +python main.py --dry-run # 只扫描不点击 +python main.py --dump-ui # 输出 UI 元素树 +python main.py --debug # 详细日志 +``` + +## 配置重点 + +- `config.yaml` 中可设置扫描间隔、延迟范围、白/黑名单、工作时间、媒体类型开关 +- 默认不处理视频(`media.click_videos: false`) +- 默认黑名单包含微信系统账号 + +## 注意事项 + +- 微信窗口需要保持可见(不能最小化) +- 运行时会占用微信前台操作 +- 建议在专用电脑上运行 diff --git a/README.md b/README.md new file mode 100644 index 0000000..24ffb20 --- /dev/null +++ b/README.md @@ -0,0 +1,156 @@ +# WeChat 消息自动点击器 + +自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载到本地目录。 + +## 背景 + +微信桌面端不会自动下载图片/文件的原始数据,需要用户手动点击预览后才会触发下载。本工具通过 macOS 辅助功能 API 自动完成这个操作,配合信息收集系统实现群聊数据的自动化入库。 + +## 系统要求 + +- macOS Sonoma 14+ +- Python 3.11+ +- 微信桌面端 v4.1.9+(已登录) + +## 安装 + +### 1. 安装 Python 依赖 + +```bash +cd wechat_msg_clicker +pip install -r requirements.txt +``` + +依赖包: +- `pyobjc-framework-Quartz` — macOS 图形框架绑定 +- `pyobjc-framework-ApplicationServices` — 辅助功能 API 绑定 +- `pyobjc-framework-Cocoa` — macOS 应用框架绑定 +- `PyYAML` — 配置文件解析 + +### 2. 授权辅助功能权限 + +脚本需要 macOS 辅助功能权限才能操作微信 UI 元素: + +1. 打开 **系统设置** > **隐私与安全性** > **辅助功能** +2. 点击左下角 **+** 号 +3. 添加你使用的终端应用(如 Terminal.app 或 iTerm2) +4. 如果通过 Python 直接运行,还需要添加 Python 解释器路径(如 `/Users/你的用户名/miniconda3/bin/python3`) +5. 确保添加后开关为 **开启** 状态 + +> 首次运行时如果未授权,脚本会弹出系统提示框引导你授权。 + +### 3. 准备配置文件 + +```bash +cp config.example.yaml config.yaml +``` + +按需编辑 `config.yaml`,主要配置项: + +| 配置项 | 默认值 | 说明 | +|---|---|---| +| `scan.interval_seconds` | 30 | 扫描间隔(秒) | +| `scan.max_chats_per_scan` | 5 | 每次最多处理几个聊天 | +| `schedule.start_hour` | 8 | 工作开始时间 | +| `schedule.end_hour` | 23 | 工作结束时间 | +| `filter.mode` | all | 过滤模式:all / whitelist / blacklist | +| `filter.blacklist` | 微信系统号 | 排除的聊天名称 | +| `media.click_images` | true | 是否点击图片 | +| `media.click_files` | true | 是否点击文件 | +| `media.click_videos` | false | 是否点击视频(默认关闭) | +| `media.max_media_per_chat` | 20 | 每个聊天最多点击几个媒体 | + +### 4. 确保微信就绪 + +- 微信桌面端已启动并登录 +- 微信窗口保持可见(不能最小化到 Dock) +- 建议在专用电脑上运行,脚本运行时会操作微信前台窗口 + +## 验证流程 + +建议按以下顺序逐步验证,确认每步正常后再进入下一步: + +### Step 1:验证 UI 访问 + +```bash +python main.py --dump-ui +``` + +预期输出:微信的 UI 元素树,包括窗口、按钮、聊天列表等信息。 + +**如果报错**: +- `辅助功能权限未授予` → 检查系统设置中的辅助功能授权 +- `微信未运行` → 确保微信桌面端已启动 +- `未找到微信主窗口` → 确保微信已登录,窗口未最小化 + +### Step 2:试运行扫描 + +```bash +python main.py --dry-run --once --debug +``` + +预期输出:扫描聊天列表,显示有未读消息的聊天和媒体数量,但**不会实际点击**。 + +检查日志确认: +- 能正确识别未读聊天 +- 聊天名称解析正确 +- 未读数量解析正确 + +### Step 3:单次完整执行 + +```bash +python main.py --once --debug +``` + +这会执行一次完整的循环:扫描 → 点击聊天 → 点击图片/文件 → 关闭预览 → 关闭会话。 + +观察过程中: +- 微信窗口是否正常被操作 +- 图片预览是否正常打开和关闭 +- 是否能正确返回聊天列表 + +### Step 4:正式运行 + +```bash +python main.py +``` + +脚本会持续运行,按配置的间隔和工作时间自动处理新消息。 + +后台运行建议使用 `nohup` 或 `tmux`: + +```bash +# 方式一:nohup +nohup python main.py > /dev/null 2>&1 & + +# 方式二:tmux(推荐,方便查看日志) +tmux new -s wechat +python main.py +# Ctrl+B D 分离会话 +# tmux attach -t wechat 重新连接 +``` + +## 命令参考 + +| 命令 | 说明 | +|---|---| +| `python main.py` | 持续运行 | +| `python main.py --once` | 单次扫描后退出 | +| `python main.py --dry-run` | 只扫描不点击 | +| `python main.py --dump-ui` | 输出微信 UI 元素树 | +| `python main.py --debug` | 开启 DEBUG 级别日志 | +| `python main.py --config path/to/config.yaml` | 指定配置文件 | + +参数可组合使用,如 `python main.py --dry-run --once --debug`。 + +## 日志 + +日志同时输出到控制台和文件 `wechat_clicker.log`(可在配置中修改)。 + +日志文件自动轮转,默认最大 10MB,保留 5 个备份。 + +## 注意事项 + +- **封号风险**:工具内置了多种拟人策略(随机延迟、长休息、工作时间限制),但自动化操作始终存在被检测的可能,请自行评估风险 +- **微信更新**:微信版本更新可能改变 UI 结构,导致脚本失效。如遇到问题,先用 `--dump-ui` 检查 UI 变化 +- **前台占用**:脚本运行时会操作微信窗口,不建议同时手动使用微信 diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..b8a4c12 --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,53 @@ +# WeChat 消息自动点击器 配置文件 +# 复制为 config.yaml 后根据需要修改 + +wechat: + bundle_id: "com.tencent.xinWeChat" + process_name: "WeChat" + +# 扫描行为 +scan: + interval_seconds: 30 # 每次扫描间隔(秒) + max_chats_per_scan: 5 # 每次扫描最多处理几个聊天 + scroll_chat_list: false # 是否滚动聊天列表查找更多聊天 + +# 延迟范围(秒),模拟真人操作节奏 +delays: + before_click_chat: [2, 5] # 点击聊天前 + after_open_chat: [1, 3] # 打开会话后等待 + before_click_media: [1, 4] # 点击图片/文件前 + after_click_media: [3, 8] # 点击图片/文件后(等待下载) + before_close_preview: [1, 3] # 关闭预览前 + before_close_chat: [1, 2] # 关闭会话窗口前 + between_messages: [0.5, 2] # 处理每条消息之间 + +# 工作时间(24小时制) +schedule: + enabled: true + start_hour: 8 + end_hour: 23 + pause_on_weekends: false + +# 聊天过滤 +filter: + mode: "all" # "all": 全部, "whitelist": 仅白名单, "blacklist": 排除黑名单 + whitelist: [] # 白名单模式下,仅处理这些聊天 + blacklist: # 黑名单模式下,排除这些聊天 + - "腾讯新闻" + - "微信支付" + - "微信团队" + +# 媒体处理 +media: + click_images: true # 是否点击图片 + click_files: true # 是否点击文件 + click_videos: false # 是否点击视频(视频可能很大,默认关闭) + max_media_per_chat: 20 # 每个聊天最多点击几个媒体 + +# 日志配置 +logging: + level: "INFO" # DEBUG, INFO, WARNING, ERROR + file: "wechat_clicker.log" + max_bytes: 10485760 # 10MB + backup_count: 5 + console: true diff --git a/main.py b/main.py new file mode 100644 index 0000000..dd3dc4a --- /dev/null +++ b/main.py @@ -0,0 +1,74 @@ +"""WeChat 消息自动点击器 — 入口 + +自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载。 +""" + +import argparse +import signal +import sys + +from wechat_clicker.config import Config +from wechat_clicker.automator import WeChatAutomator +from wechat_clicker.logger_setup import setup_logging + + +def main(): + parser = argparse.ArgumentParser( + description="微信消息自动点击器 — 自动点击图片和文件触发下载" + ) + parser.add_argument( + "--config", default="config.yaml", + help="配置文件路径 (默认: config.yaml)" + ) + parser.add_argument( + "--dry-run", action="store_true", + help="试运行模式:只扫描不点击" + ) + parser.add_argument( + "--once", action="store_true", + help="只执行一次扫描循环后退出" + ) + parser.add_argument( + "--debug", action="store_true", + help="开启详细日志 (DEBUG 级别)" + ) + parser.add_argument( + "--dump-ui", action="store_true", + help="输出微信 UI 元素树后退出 (调试用)" + ) + args = parser.parse_args() + + # 加载配置 + config = Config(args.config) + if args.debug: + config.override("logging.level", "DEBUG") + + # 初始化日志 + logger = setup_logging(config) + logger.info("微信自动点击器启动") + + if args.dry_run: + logger.info("*** 试运行模式:只扫描不点击 ***") + + # 优雅退出 + def signal_handler(sig, frame): + logger.info("收到退出信号,正在关闭...") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # 创建自动化器 + automator = WeChatAutomator(config, dry_run=args.dry_run) + + # 执行 + if args.dump_ui: + automator.dump_ui_tree() + elif args.once: + automator.run_once() + else: + automator.run() + + +if __name__ == "__main__": + main() diff --git a/project.md b/project.md new file mode 100644 index 0000000..4852598 --- /dev/null +++ b/project.md @@ -0,0 +1,67 @@ +# 项目规划:WeChat 消息自动点击器 + +## 项目目标 + +创建一个长期稳定运行的自动化工具,自动点击微信桌面端中未读消息的图片和文件,触发微信下载原始文件到本地目录,配合信息收集系统完成数据入库流程。 + +## 项目状态 + +### 已完成 (v0.1.0 - 2026/04/22) + +- [x] 项目结构搭建 +- [x] 配置系统(YAML 配置文件、默认值、白/黑名单) +- [x] AXUIElement 底层封装(ax_bridge.py) +- [x] 微信 UI 导航(wechat_ui.py)— 聊天列表解析、消息类型分类 +- [x] UI 状态机(state_machine.py)— 基于窗口计数的状态检测与恢复 +- [x] 拟人行为引擎(human_like.py)— 高斯分布延迟、长休息、工作时间 +- [x] 主自动化逻辑(automator.py)— 完整扫描-点击-关闭循环 +- [x] 入口脚本(main.py)— 参数解析、信号处理 +- [x] 调试工具(--dump-ui, --dry-run) + +### 待验证 + +- [ ] 在真实环境中测试 AXUIElement 对微信的访问能力 +- [ ] 验证聊天列表 title 解析的准确性 +- [ ] 验证图片点击后预览关闭的可靠性 +- [ ] 长时间运行稳定性测试 + +### 未来可能的改进 + +- [ ] 聊天列表滚动支持(处理不在可见区域的聊天) +- [ ] 消息滚动支持(处理更早的图片消息) +- [ ] 已处理消息去重(记录已点击的媒体,避免重复) +- [ ] 微信版本适配(检测 UI 结构变化并自动调整) +- [ ] 运行状态 Web 面板(远程监控) +- [ ] 与信息收集系统直接集成 + +## 技术细节 + +### 微信 UI 结构 (v4.1.9) + +- 主窗口 "微信" 包含侧边栏 + 聊天列表 +- 聊天列表:`AXList name="会话"`,子元素为 `AXStaticText` +- 点击聊天项打开独立会话窗口 +- 消息列表:`AXList name="消息"` +- 图片消息 title = "图片",文件消息 title 以 "文件\n" 开头 + +### 防封策略 + +- 高斯分布随机延迟(非均匀) +- ±20% 扫描间隔抖动 +- 5% 概率长休息(30-120 秒) +- 工作时间限制 +- 每次最多 5 个聊天 / 每聊天最多 20 个媒体 + +### 依赖 + +- pyobjc-framework-Quartz +- pyobjc-framework-ApplicationServices +- pyobjc-framework-Cocoa +- PyYAML + +### 系统要求 + +- macOS Sonoma 14+ +- Python 3.11+ +- 辅助功能权限(终端/Python 需要授权) +- 微信桌面端 v4.1.9+ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..79d3e5f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +pyobjc-framework-Quartz>=10.0 +pyobjc-framework-ApplicationServices>=10.0 +pyobjc-framework-Cocoa>=10.0 +PyYAML>=6.0 diff --git a/wechat_clicker/__init__.py b/wechat_clicker/__init__.py new file mode 100644 index 0000000..6927735 --- /dev/null +++ b/wechat_clicker/__init__.py @@ -0,0 +1 @@ +"""WeChat 消息自动点击器""" diff --git a/wechat_clicker/automator.py b/wechat_clicker/automator.py new file mode 100644 index 0000000..2dcf824 --- /dev/null +++ b/wechat_clicker/automator.py @@ -0,0 +1,334 @@ +"""主自动化逻辑 + +编排整个工作流程:扫描未读聊天 → 点击进入 → 点击图片/文件 → 关闭预览 → 关闭会话 → 循环 +""" + +import logging +import time + +from .ax_bridge import AXBridge +from .config import Config +from .human_like import HumanBehavior +from .state_machine import StateMachine, UIState +from .wechat_ui import WeChatUI + +logger = logging.getLogger("wechat_clicker.automator") + + +class WeChatAutomator: + """微信消息自动点击器""" + + def __init__(self, config: Config, dry_run: bool = False): + self.config = config + self.dry_run = dry_run + + # 初始化各组件 + self.ax = AXBridge() + self.ui = WeChatUI(self.ax, config.bundle_id) + self.state = StateMachine(self.ax, self.ui) + self.human = HumanBehavior(config) + + # 统计 + self._scan_count = 0 + self._total_chats_processed = 0 + self._total_media_clicked = 0 + + # ---------------------------------------------------------------- + # 启动检查 + # ---------------------------------------------------------------- + + def verify_setup(self) -> bool: + """验证环境和权限。""" + # 检查辅助功能权限 + if not self.ax.check_accessibility(): + logger.error("缺少辅助功能权限,无法继续") + return False + + # 检查微信是否运行 + app_ref = self.ui.get_app_ref() + if app_ref is None: + logger.error("微信未运行,请先启动微信桌面端") + return False + + # 检查主窗口 + main_win = self.ui.get_main_window() + if main_win is None: + logger.error("未找到微信主窗口,请确保微信已登录并可见") + return False + + logger.info("环境检查通过") + return True + + # ---------------------------------------------------------------- + # 主循环 + # ---------------------------------------------------------------- + + def run(self): + """永久运行的主循环。""" + logger.info("微信自动点击器启动") + + if not self.verify_setup(): + return + + while True: + try: + self._run_one_cycle() + except KeyboardInterrupt: + logger.info("用户中断,正在退出...") + break + except Exception as e: + logger.error(f"主循环异常: {e}", exc_info=True) + # 尝试恢复状态 + try: + self.state.recover_to_chat_list() + except Exception: + pass + time.sleep(10) + + self._print_stats() + + def run_once(self): + """运行一次扫描循环。""" + logger.info("执行单次扫描") + + if not self.verify_setup(): + return + + self._run_one_cycle() + self._print_stats() + + def _run_one_cycle(self): + """执行一个完整的扫描-处理循环。""" + # 检查工作时间 + if self.human.is_off_hours(): + logger.info("当前为非工作时间,等待中...") + time.sleep(300) + return + + # 偶尔触发长休息 + if self.human.should_take_break(): + self.human.long_break() + return + + # 确保微信在前台 + self.ui.ensure_wechat_frontmost() + time.sleep(0.5) + + # 恢复到聊天列表 + if not self.state.recover_to_chat_list(): + logger.warning("无法恢复到聊天列表,跳过本次循环") + time.sleep(10) + return + + self._scan_count += 1 + + # 检查是否有未读消息 + global_unread = self.ui.get_global_unread_count() + if global_unread == 0: + logger.debug(f"[扫描#{self._scan_count}] 没有未读消息") + sleep_time = self.human.scan_interval_with_jitter() + time.sleep(sleep_time) + return + + logger.info(f"[扫描#{self._scan_count}] 全局未读: {global_unread}") + + # 获取未读聊天列表 + unread_chats = self.ui.get_unread_chats() + if not unread_chats: + logger.debug("聊天列表中未发现未读项(可能需要滚动)") + sleep_time = self.human.scan_interval_with_jitter() + time.sleep(sleep_time) + return + + # 过滤和限制数量 + chats_to_process = [] + for chat in unread_chats: + if self.config.should_process_chat(chat.name): + chats_to_process.append(chat) + + count = self.human.random_subset_count( + len(chats_to_process), self.config.max_chats_per_scan + ) + chats_to_process = chats_to_process[:count] + + logger.info( + f"将处理 {len(chats_to_process)} 个聊天 " + f"(共 {len(unread_chats)} 个未读)" + ) + + # 逐个处理 + for chat in chats_to_process: + self.human.delay("before_click_chat") + self._process_chat(chat) + self.human.delay("before_close_chat") + + # 等待下次扫描 + sleep_time = self.human.scan_interval_with_jitter() + logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描") + time.sleep(sleep_time) + + # ---------------------------------------------------------------- + # 处理单个聊天 + # ---------------------------------------------------------------- + + def _process_chat(self, chat): + """打开一个聊天,处理其中的媒体消息,然后关闭。""" + logger.info(f"处理聊天: {chat.name} (未读: {chat.unread_count})") + + if self.dry_run: + logger.info(f" [DRY-RUN] 跳过点击: {chat.name}") + return + + # 点击聊天项打开会话 + if not self.ax.press(chat.element): + logger.warning(f" 点击聊天项失败: {chat.name}") + return + + self.human.delay("after_open_chat") + + # 验证会话窗口已打开 + state = self.state.detect_state() + if state != UIState.CONVERSATION_OPEN: + logger.warning( + f" 会话窗口未打开 (状态: {state.value}),尝试恢复" + ) + self.state.recover_to_chat_list() + return + + # 处理会话中的媒体 + conv_window = self.ui.get_conversation_window() + if conv_window: + media_count = self._process_media(conv_window, chat.name) + self._total_media_clicked += media_count + + self._total_chats_processed += 1 + + # 关闭会话窗口 + self.human.delay("before_close_chat") + if not self.state.close_current_conversation(): + logger.warning(" 关闭会话失败,尝试恢复") + self.state.recover_to_chat_list() + + # ---------------------------------------------------------------- + # 处理媒体消息 + # ---------------------------------------------------------------- + + def _process_media(self, conv_window, chat_name: str) -> int: + """在打开的会话中查找并点击媒体消息。 + + Returns: + 点击的媒体数量。 + """ + media_messages = self.ui.get_media_messages(conv_window) + if not media_messages: + logger.debug(f" {chat_name}: 未发现可见媒体消息") + return 0 + + # 过滤需要点击的类型 + targets = [] + for msg in media_messages: + if msg.msg_type == "image" and self.config.click_images: + targets.append(msg) + elif msg.msg_type == "file" and self.config.click_files: + targets.append(msg) + elif msg.msg_type == "video" and self.config.click_videos: + targets.append(msg) + + if not targets: + logger.debug(f" {chat_name}: 无需处理的媒体") + return 0 + + # 限制数量,从最新的开始(列表末尾 = 最新) + max_count = self.config.max_media_per_chat + targets = targets[-max_count:] if len(targets) > max_count else targets + # 反转,从最新的开始处理 + targets = list(reversed(targets)) + + logger.info(f" {chat_name}: 发现 {len(targets)} 个媒体消息待处理") + + clicked = 0 + for msg in targets: + self.human.delay("before_click_media") + + short_title = msg.title.replace("\n", " ")[:50] + logger.info(f" 点击{msg.msg_type}: {short_title}") + + # 点击媒体 + if not self.ax.press(msg.element): + logger.warning(f" 点击失败: {short_title}") + continue + + self.human.delay("after_click_media") + + # 关闭可能出现的预览 + self._dismiss_preview_safe() + + clicked += 1 + self.human.delay("between_messages") + + # 检查连续错误 + if self.ax.should_backoff(): + logger.warning("连续错误过多,暂停处理") + self.ax.reset_error_count() + break + + return clicked + + def _dismiss_preview_safe(self): + """安全地关闭可能出现的媒体预览。""" + # 等一小会儿让预览可能出现 + self.human.micro_jitter() + + state = self.state.detect_state() + if state == UIState.MEDIA_PREVIEW: + self.human.delay("before_close_preview") + self.state.dismiss_preview() + else: + # 保守策略:即使未检测到预览也发送 Escape + # 因为某些预览可能不会创建新窗口 + self.ax.send_escape_key() + time.sleep(0.3) + + # ---------------------------------------------------------------- + # 调试工具 + # ---------------------------------------------------------------- + + def dump_ui_tree(self): + """输出微信 UI 元素树(调试用)。""" + if not self.verify_setup(): + return + + self.ui.ensure_wechat_frontmost() + time.sleep(0.5) + + # 输出主窗口 + main_win = self.ui.get_main_window() + if main_win: + print("=== 主窗口 (微信) ===") + print(self.ax.dump_element(main_win, max_depth=5)) + + # 输出会话窗口 + conv_windows = self.ui.get_conversation_windows() + for win in conv_windows: + title = self.ax.get_title(win) + print(f"\n=== 会话窗口 ({title}) ===") + print(self.ax.dump_element(win, max_depth=5)) + + # 输出聊天列表解析结果 + print("\n=== 聊天列表解析 ===") + items = self.ui.get_chat_items() + for item in items: + status = f"[未读:{item.unread_count}]" if item.unread_count > 0 else "" + print(f" {item.name} {status} | {item.preview} | {item.timestamp}") + + # ---------------------------------------------------------------- + # 统计 + # ---------------------------------------------------------------- + + def _print_stats(self): + """输出运行统计。""" + logger.info( + f"运行统计: 扫描={self._scan_count}, " + f"处理聊天={self._total_chats_processed}, " + f"点击媒体={self._total_media_clicked}" + ) diff --git a/wechat_clicker/ax_bridge.py b/wechat_clicker/ax_bridge.py new file mode 100644 index 0000000..eb9f42b --- /dev/null +++ b/wechat_clicker/ax_bridge.py @@ -0,0 +1,300 @@ +"""macOS Accessibility API (AXUIElement) 底层封装 + +通过 pyobjc 调用 ApplicationServices 框架,提供对 UI 元素的读取和操作能力。 +""" + +import logging +import time + +from ApplicationServices import ( + AXIsProcessTrusted, + AXIsProcessTrustedWithOptions, + AXUIElementCreateApplication, + AXUIElementCopyAttributeValue, + AXUIElementCopyAttributeNames, + AXUIElementCopyActionNames, + AXUIElementPerformAction, +) +from Cocoa import ( + NSRunningApplication, + NSWorkspace, + NSApplicationActivateIgnoringOtherApps, +) +from Quartz import ( + CGEventCreateKeyboardEvent, + CGEventPost, + CGEventSetFlags, + kCGHIDEventTap, + kCGEventFlagMaskCommand, +) +from CoreFoundation import kCFBooleanTrue + +logger = logging.getLogger("wechat_clicker.ax_bridge") + +# AXError 常量 +kAXErrorSuccess = 0 +kAXErrorAttributeUnsupported = -25205 +kAXErrorNoValue = -25212 +kAXErrorInvalidUIElement = -25202 + +# 键码 +kVK_Escape = 0x35 +kVK_W = 0x0D + + +class AXBridge: + """AXUIElement 底层封装""" + + def __init__(self): + self._error_count = 0 + self._max_errors_before_backoff = 10 + self._backoff_seconds = 60 + + # ---------------------------------------------------------------- + # 辅助功能权限检查 + # ---------------------------------------------------------------- + + def check_accessibility(self) -> bool: + """检查当前进程是否拥有辅助功能权限。""" + trusted = AXIsProcessTrusted() + if not trusted: + logger.error( + "辅助功能权限未授予!请前往:系统设置 > 隐私与安全 > 辅助功能," + "添加当前终端应用或 Python 解释器。" + ) + # 弹出系统提示框 + AXIsProcessTrustedWithOptions( + { + "AXTrustedCheckOptionPrompt": kCFBooleanTrue + } + ) + return trusted + + # ---------------------------------------------------------------- + # 获取应用引用 + # ---------------------------------------------------------------- + + def get_app_ref(self, bundle_id: str): + """根据 bundle ID 获取应用的 AXUIElement 引用。返回 None 表示应用未运行。""" + apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id) + if not apps or len(apps) == 0: + logger.error(f"未找到运行中的应用: {bundle_id}") + return None + app = apps[0] + pid = app.processIdentifier() + logger.debug(f"找到应用 {bundle_id}, PID={pid}") + return AXUIElementCreateApplication(pid) + + def bring_to_front(self, bundle_id: str) -> bool: + """将指定应用带到最前台。""" + apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id) + if not apps or len(apps) == 0: + return False + app = apps[0] + return app.activateWithOptions_(NSApplicationActivateIgnoringOtherApps) + + # ---------------------------------------------------------------- + # 属性读取 + # ---------------------------------------------------------------- + + def get_attribute(self, element, attr_name: str): + """读取 UI 元素的指定属性。失败返回 None。""" + try: + err, value = AXUIElementCopyAttributeValue(element, attr_name, None) + if err == kAXErrorSuccess: + self._error_count = 0 + return value + if err not in (kAXErrorNoValue, kAXErrorAttributeUnsupported): + logger.debug(f"读取属性 {attr_name} 失败, 错误码: {err}") + return None + except Exception as e: + logger.debug(f"读取属性 {attr_name} 异常: {e}") + return None + + def get_attribute_names(self, element) -> list: + """获取元素支持的所有属性名。""" + try: + err, names = AXUIElementCopyAttributeNames(element, None) + if err == kAXErrorSuccess: + return list(names) if names else [] + return [] + except Exception: + return [] + + def get_action_names(self, element) -> list: + """获取元素支持的所有操作名。""" + try: + err, names = AXUIElementCopyActionNames(element, None) + if err == kAXErrorSuccess: + return list(names) if names else [] + return [] + except Exception: + return [] + + def get_children(self, element) -> list: + """获取子元素列表。""" + children = self.get_attribute(element, "AXChildren") + if children is None: + return [] + return list(children) + + def get_role(self, element) -> str: + """获取元素角色(AXRole)。""" + return self.get_attribute(element, "AXRole") or "" + + def get_title(self, element) -> str: + """获取元素标题(AXTitle)。""" + return self.get_attribute(element, "AXTitle") or "" + + def get_value(self, element) -> str: + """获取元素值(AXValue)。""" + return self.get_attribute(element, "AXValue") or "" + + def get_name(self, element) -> str: + """获取元素名称,优先 AXTitle,其次 AXValue。""" + title = self.get_attribute(element, "AXTitle") + if title: + return title + value = self.get_attribute(element, "AXValue") + if value: + return value + return "" + + def get_description(self, element) -> str: + """获取元素描述(AXDescription)。""" + return self.get_attribute(element, "AXDescription") or "" + + def get_size(self, element) -> tuple: + """获取元素尺寸 (width, height)。返回 (0, 0) 表示不可见或失败。""" + size = self.get_attribute(element, "AXSize") + if size is None: + return (0, 0) + try: + return (int(size.width), int(size.height)) + except (AttributeError, TypeError): + return (0, 0) + + def get_position(self, element) -> tuple: + """获取元素位置 (x, y)。""" + pos = self.get_attribute(element, "AXPosition") + if pos is None: + return (0, 0) + try: + return (int(pos.x), int(pos.y)) + except (AttributeError, TypeError): + return (0, 0) + + def get_windows(self, app_ref) -> list: + """获取应用的所有窗口。""" + windows = self.get_attribute(app_ref, "AXWindows") + if windows is None: + return [] + return list(windows) + + def get_focused_window(self, app_ref): + """获取当前聚焦的窗口。""" + return self.get_attribute(app_ref, "AXFocusedWindow") + + # ---------------------------------------------------------------- + # 操作执行 + # ---------------------------------------------------------------- + + def perform_action(self, element, action: str) -> bool: + """对元素执行指定操作。""" + try: + err = AXUIElementPerformAction(element, action) + if err == kAXErrorSuccess: + self._error_count = 0 + return True + logger.debug(f"执行操作 {action} 失败, 错误码: {err}") + self._error_count += 1 + return False + except Exception as e: + logger.debug(f"执行操作 {action} 异常: {e}") + self._error_count += 1 + return False + + def press(self, element) -> bool: + """对元素执行 AXPress(等效点击)。""" + return self.perform_action(element, "AXPress") + + # ---------------------------------------------------------------- + # 键盘事件 + # ---------------------------------------------------------------- + + def send_escape_key(self): + """发送 Escape 键事件。""" + self._send_key(kVK_Escape) + + def send_cmd_w(self): + """发送 Cmd+W 键事件(关闭窗口)。""" + self._send_key(kVK_W, flags=kCGEventFlagMaskCommand) + + def _send_key(self, key_code: int, flags: int = 0): + """发送键盘事件(按下+抬起)。""" + try: + # Key down + event_down = CGEventCreateKeyboardEvent(None, key_code, True) + if flags: + CGEventSetFlags(event_down, flags) + CGEventPost(kCGHIDEventTap, event_down) + + time.sleep(0.05) + + # Key up + event_up = CGEventCreateKeyboardEvent(None, key_code, False) + if flags: + CGEventSetFlags(event_up, flags) + CGEventPost(kCGHIDEventTap, event_up) + except Exception as e: + logger.error(f"发送键盘事件失败: {e}") + + # ---------------------------------------------------------------- + # 错误管理 + # ---------------------------------------------------------------- + + def should_backoff(self) -> bool: + """连续错误过多时应暂停一段时间。""" + return self._error_count >= self._max_errors_before_backoff + + def reset_error_count(self): + """重置错误计数。""" + self._error_count = 0 + + # ---------------------------------------------------------------- + # 调试工具 + # ---------------------------------------------------------------- + + def dump_element(self, element, indent: int = 0, max_depth: int = 5) -> str: + """递归输出元素树(调试用)。""" + if indent >= max_depth: + return "" + + lines = [] + role = self.get_role(element) + title = self.get_title(element) + name_attr = self.get_attribute(element, "AXValue") or "" + desc = self.get_description(element) + size = self.get_size(element) + + prefix = " " * indent + info_parts = [f"role={role}"] + if title: + # 截断过长的 title,用单行表示 + short_title = title.replace("\n", "\\n")[:80] + info_parts.append(f'title="{short_title}"') + if name_attr: + short_name = str(name_attr).replace("\n", "\\n")[:80] + info_parts.append(f'value="{short_name}"') + if desc: + info_parts.append(f'desc="{desc}"') + if size != (0, 0): + info_parts.append(f"size={size[0]}x{size[1]}") + + lines.append(f"{prefix}[{', '.join(info_parts)}]") + + children = self.get_children(element) + for child in children: + lines.append(self.dump_element(child, indent + 1, max_depth)) + + return "\n".join(lines) diff --git a/wechat_clicker/config.py b/wechat_clicker/config.py new file mode 100644 index 0000000..69b9271 --- /dev/null +++ b/wechat_clicker/config.py @@ -0,0 +1,201 @@ +"""配置加载与管理""" + +import os +import random +import yaml +from datetime import datetime + + +# 默认配置 +DEFAULTS = { + "wechat": { + "bundle_id": "com.tencent.xinWeChat", + "process_name": "WeChat", + }, + "scan": { + "interval_seconds": 30, + "max_chats_per_scan": 5, + "scroll_chat_list": False, + }, + "delays": { + "before_click_chat": [2, 5], + "after_open_chat": [1, 3], + "before_click_media": [1, 4], + "after_click_media": [3, 8], + "before_close_preview": [1, 3], + "before_close_chat": [1, 2], + "between_messages": [0.5, 2], + }, + "schedule": { + "enabled": True, + "start_hour": 8, + "end_hour": 23, + "pause_on_weekends": False, + }, + "filter": { + "mode": "all", + "whitelist": [], + "blacklist": ["腾讯新闻", "微信支付", "微信团队"], + }, + "media": { + "click_images": True, + "click_files": True, + "click_videos": False, + "max_media_per_chat": 20, + }, + "logging": { + "level": "INFO", + "file": "wechat_clicker.log", + "max_bytes": 10485760, + "backup_count": 5, + "console": True, + }, +} + + +def _deep_merge(base: dict, override: dict) -> dict: + """深度合并两个字典,override 中的值覆盖 base 中的值。""" + result = base.copy() + for key, value in override.items(): + if key in result and isinstance(result[key], dict) and isinstance(value, dict): + result[key] = _deep_merge(result[key], value) + else: + result[key] = value + return result + + +class Config: + """配置管理器""" + + def __init__(self, config_path: str = "config.yaml"): + self._config_path = config_path + self._data = self._load() + + def _load(self) -> dict: + """加载配置文件,与默认值合并。""" + if os.path.exists(self._config_path): + with open(self._config_path, "r", encoding="utf-8") as f: + user_config = yaml.safe_load(f) or {} + return _deep_merge(DEFAULTS, user_config) + return DEFAULTS.copy() + + def _get(self, dotted_key: str, default=None): + """通过点号分隔的 key 获取配置值。例如 'scan.interval_seconds'""" + keys = dotted_key.split(".") + value = self._data + for key in keys: + if isinstance(value, dict) and key in value: + value = value[key] + else: + return default + return value + + def override(self, dotted_key: str, value): + """运行时覆盖某个配置值。""" + keys = dotted_key.split(".") + target = self._data + for key in keys[:-1]: + target = target.setdefault(key, {}) + target[keys[-1]] = value + + # --- 便捷属性 --- + + @property + def bundle_id(self) -> str: + return self._get("wechat.bundle_id") + + @property + def process_name(self) -> str: + return self._get("wechat.process_name") + + @property + def scan_interval(self) -> int: + return self._get("scan.interval_seconds") + + @property + def max_chats_per_scan(self) -> int: + return self._get("scan.max_chats_per_scan") + + @property + def scroll_chat_list(self) -> bool: + return self._get("scan.scroll_chat_list") + + @property + def click_images(self) -> bool: + return self._get("media.click_images") + + @property + def click_files(self) -> bool: + return self._get("media.click_files") + + @property + def click_videos(self) -> bool: + return self._get("media.click_videos") + + @property + def max_media_per_chat(self) -> int: + return self._get("media.max_media_per_chat") + + @property + def log_level(self) -> str: + return self._get("logging.level") + + @property + def log_file(self) -> str: + return self._get("logging.file") + + @property + def log_max_bytes(self) -> int: + return self._get("logging.max_bytes") + + @property + def log_backup_count(self) -> int: + return self._get("logging.backup_count") + + @property + def log_console(self) -> bool: + return self._get("logging.console") + + # --- 延迟 --- + + def get_delay(self, delay_name: str) -> float: + """获取指定名称的随机延迟时间(秒),使用截断正态分布。""" + delay_range = self._get(f"delays.{delay_name}", [1, 3]) + min_val, max_val = delay_range[0], delay_range[1] + mid = (min_val + max_val) / 2 + std = (max_val - min_val) / 4 # 大部分值落在范围内 + value = random.gauss(mid, std) + return max(min_val, min(max_val, value)) + + # --- 工作时间 --- + + def is_within_working_hours(self) -> bool: + """检查当前时间是否在配置的工作时间内。""" + if not self._get("schedule.enabled"): + return True + + now = datetime.now() + # 周末检查 + if self._get("schedule.pause_on_weekends") and now.weekday() >= 5: + return False + + start_hour = self._get("schedule.start_hour") + end_hour = self._get("schedule.end_hour") + return start_hour <= now.hour < end_hour + + # --- 聊天过滤 --- + + def should_process_chat(self, chat_name: str) -> bool: + """根据白名单/黑名单判断是否处理此聊天。""" + mode = self._get("filter.mode", "all") + if mode == "all": + # 即使在 all 模式下也检查黑名单 + blacklist = self._get("filter.blacklist", []) + return chat_name not in blacklist + elif mode == "whitelist": + whitelist = self._get("filter.whitelist", []) + return chat_name in whitelist + elif mode == "blacklist": + blacklist = self._get("filter.blacklist", []) + return chat_name not in blacklist + return True diff --git a/wechat_clicker/human_like.py b/wechat_clicker/human_like.py new file mode 100644 index 0000000..36849b7 --- /dev/null +++ b/wechat_clicker/human_like.py @@ -0,0 +1,91 @@ +"""拟人行为模拟 + +使用高斯分布随机延迟、偶尔长休息、工作时间限制等策略, +模拟真人操作节奏,降低被检测风险。 +""" + +import logging +import random +import time + +from .config import Config + +logger = logging.getLogger("wechat_clicker.human_like") + + +class HumanBehavior: + """拟人行为引擎""" + + def __init__(self, config: Config): + self.config = config + self._cycle_count = 0 + self._break_probability = 0.05 # 5% 概率触发长休息 + self._break_min = 30 + self._break_max = 120 + + def delay(self, delay_name: str): + """执行指定名称的随机延迟。""" + seconds = self.config.get_delay(delay_name) + logger.debug(f"延迟 {delay_name}: {seconds:.1f}s") + time.sleep(seconds) + + def random_delay(self, min_s: float, max_s: float): + """执行指定范围内的随机延迟(高斯分布)。""" + mid = (min_s + max_s) / 2 + std = (max_s - min_s) / 4 + value = random.gauss(mid, std) + value = max(min_s, min(max_s, value)) + time.sleep(value) + + def micro_jitter(self): + """极短延迟(0.1-0.5 秒),用于连续操作之间。""" + time.sleep(random.uniform(0.1, 0.5)) + + def scan_interval_with_jitter(self) -> float: + """返回带 ±20% 抖动的扫描间隔时间(秒)。""" + base = self.config.scan_interval + jitter = base * 0.2 + return random.uniform(base - jitter, base + jitter) + + # ---------------------------------------------------------------- + # 长休息 + # ---------------------------------------------------------------- + + def should_take_break(self) -> bool: + """判断是否应该触发一次长休息。 + + 每个循环有固定概率触发,模拟真人偶尔离开电脑。 + """ + self._cycle_count += 1 + return random.random() < self._break_probability + + def long_break(self): + """执行一次长休息(30-120 秒)。""" + duration = random.uniform(self._break_min, self._break_max) + logger.info(f"模拟长休息: {duration:.0f}s") + time.sleep(duration) + + # ---------------------------------------------------------------- + # 工作时间 + # ---------------------------------------------------------------- + + def is_off_hours(self) -> bool: + """检查是否在非工作时间(应暂停操作)。""" + return not self.config.is_within_working_hours() + + # ---------------------------------------------------------------- + # 随机子集 + # ---------------------------------------------------------------- + + def random_subset_count(self, total: int, max_count: int) -> int: + """决定本次处理多少个项目。 + + 不总是处理 max_count 个,偶尔少处理一些,模拟真人不会每次都处理所有内容。 + """ + if total <= 0: + return 0 + upper = min(total, max_count) + # 80% 概率处理全部(上限内),20% 概率减少 1-2 个 + if random.random() < 0.8: + return upper + return max(1, upper - random.randint(1, 2)) diff --git a/wechat_clicker/logger_setup.py b/wechat_clicker/logger_setup.py new file mode 100644 index 0000000..01638bc --- /dev/null +++ b/wechat_clicker/logger_setup.py @@ -0,0 +1,40 @@ +"""日志配置""" + +import logging +from logging.handlers import RotatingFileHandler + + +def setup_logging(config) -> logging.Logger: + """配置日志系统,返回 logger 实例。""" + logger = logging.getLogger("wechat_clicker") + logger.setLevel(getattr(logging, config.log_level, logging.INFO)) + + # 避免重复添加 handler + if logger.handlers: + return logger + + formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + # 文件 handler(自动轮转) + file_handler = RotatingFileHandler( + config.log_file, + maxBytes=config.log_max_bytes, + backupCount=config.log_backup_count, + encoding="utf-8", + ) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + # 控制台 handler + if config.log_console: + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + "%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + )) + logger.addHandler(console_handler) + + return logger diff --git a/wechat_clicker/state_machine.py b/wechat_clicker/state_machine.py new file mode 100644 index 0000000..1ad74ad --- /dev/null +++ b/wechat_clicker/state_machine.py @@ -0,0 +1,201 @@ +"""UI 状态机 + +基于微信窗口数量和名称判断当前 UI 状态,提供状态恢复能力。 + +微信 v4.1.9 窗口模型: +- 1 个窗口("微信"):主聊天列表 +- 2 个窗口("微信" + 聊天名):会话已打开 +- 3+ 个窗口:可能有图片预览等额外窗口 +""" + +import logging +import time +from enum import Enum + +from .ax_bridge import AXBridge +from .wechat_ui import WeChatUI + +logger = logging.getLogger("wechat_clicker.state_machine") + + +class UIState(Enum): + UNKNOWN = "unknown" + MAIN_CHAT_LIST = "main_chat_list" + CONVERSATION_OPEN = "conversation_open" + MEDIA_PREVIEW = "media_preview" + WECHAT_NOT_RUNNING = "wechat_not_running" + + +class StateMachine: + """微信 UI 状态机""" + + def __init__(self, ax: AXBridge, wechat_ui: WeChatUI): + self.ax = ax + self.ui = wechat_ui + self._current_state = UIState.UNKNOWN + self._conversation_name = None # 当前打开的会话名称 + + @property + def current_state(self) -> UIState: + return self._current_state + + @property + def conversation_name(self) -> str: + return self._conversation_name or "" + + # ---------------------------------------------------------------- + # 状态检测 + # ---------------------------------------------------------------- + + def detect_state(self) -> UIState: + """检测当前微信 UI 状态。""" + windows = self.ui.get_all_windows() + + if not windows: + self._current_state = UIState.WECHAT_NOT_RUNNING + self._conversation_name = None + logger.debug("未检测到微信窗口") + return self._current_state + + # 分析窗口 + main_window = None + other_windows = [] + for win in windows: + title = self.ax.get_title(win) + if title == "微信": + main_window = win + elif title: + other_windows.append((win, title)) + + if main_window is None: + # 微信可能最小化了,或者窗口结构变化 + self._current_state = UIState.UNKNOWN + logger.debug("未找到微信主窗口") + return self._current_state + + window_count = len(windows) + + if window_count == 1: + # 只有主窗口 = 聊天列表 + self._current_state = UIState.MAIN_CHAT_LIST + self._conversation_name = None + elif window_count == 2 and len(other_windows) == 1: + # 主窗口 + 一个会话窗口 + self._current_state = UIState.CONVERSATION_OPEN + self._conversation_name = other_windows[0][1] + elif window_count >= 3: + # 可能有预览窗口 + self._current_state = UIState.MEDIA_PREVIEW + # 第一个非主窗口通常是会话 + if other_windows: + self._conversation_name = other_windows[0][1] + else: + self._current_state = UIState.UNKNOWN + + logger.debug( + f"状态检测: {self._current_state.value}, " + f"窗口数={window_count}, 会话={self._conversation_name}" + ) + return self._current_state + + # ---------------------------------------------------------------- + # 状态恢复 + # ---------------------------------------------------------------- + + def recover_to_chat_list(self, max_retries: int = 3) -> bool: + """恢复到主聊天列表状态。关闭所有会话和预览窗口。 + + Returns: + True 如果成功恢复到聊天列表。 + """ + for attempt in range(max_retries): + state = self.detect_state() + + if state == UIState.MAIN_CHAT_LIST: + return True + + if state == UIState.WECHAT_NOT_RUNNING: + logger.error("微信未运行,无法恢复") + return False + + if state == UIState.MEDIA_PREVIEW: + # 先关闭预览(Escape) + logger.info("关闭媒体预览...") + self.ax.send_escape_key() + time.sleep(0.5) + + if state in (UIState.CONVERSATION_OPEN, UIState.MEDIA_PREVIEW): + # 关闭会话窗口 + self._close_all_conversations() + time.sleep(0.5) + + if state == UIState.UNKNOWN: + # 尝试激活微信并点击侧边栏聊天按钮 + self.ui.ensure_wechat_frontmost() + time.sleep(0.5) + self.ui.click_sidebar_chat_tab() + time.sleep(0.5) + + logger.debug(f"恢复尝试 {attempt + 1}/{max_retries}") + + # 最终检查 + final_state = self.detect_state() + if final_state == UIState.MAIN_CHAT_LIST: + return True + + logger.error(f"恢复失败,当前状态: {final_state.value}") + return False + + def _close_all_conversations(self): + """关闭所有会话窗口。""" + conv_windows = self.ui.get_conversation_windows() + for win in conv_windows: + title = self.ax.get_title(win) + close_btn = self.ui.get_close_button(win) + if close_btn: + logger.debug(f"关闭会话窗口: {title}") + self.ax.press(close_btn) + time.sleep(0.3) + else: + # 后备方案:Cmd+W + logger.debug(f"使用 Cmd+W 关闭窗口: {title}") + self.ax.send_cmd_w() + time.sleep(0.3) + + def close_current_conversation(self) -> bool: + """关闭当前打开的会话窗口。""" + conv_window = self.ui.get_conversation_window() + if conv_window is None: + return True # 没有会话窗口,视为成功 + + close_btn = self.ui.get_close_button(conv_window) + if close_btn: + self.ax.press(close_btn) + else: + self.ax.send_cmd_w() + + time.sleep(0.5) + + # 验证 + state = self.detect_state() + return state == UIState.MAIN_CHAT_LIST + + def dismiss_preview(self) -> bool: + """关闭媒体预览。""" + state = self.detect_state() + if state != UIState.MEDIA_PREVIEW: + return True # 没有预览打开 + + # 发送 Escape 关闭预览 + self.ax.send_escape_key() + time.sleep(0.5) + + # 验证 + state = self.detect_state() + if state == UIState.MEDIA_PREVIEW: + # 再试一次 + self.ax.send_escape_key() + time.sleep(0.5) + state = self.detect_state() + + return state != UIState.MEDIA_PREVIEW diff --git a/wechat_clicker/wechat_ui.py b/wechat_clicker/wechat_ui.py new file mode 100644 index 0000000..0ac3f4e --- /dev/null +++ b/wechat_clicker/wechat_ui.py @@ -0,0 +1,394 @@ +"""微信桌面端 UI 导航与元素查找 + +基于 macOS Accessibility API 探测到的微信 v4.1.9 UI 结构: +- 主窗口 "微信" 包含侧边栏和聊天列表 +- 点击聊天项会打开独立的会话窗口(以聊天名命名) +- 聊天列表:AXList name="会话" +- 消息列表:AXList name="消息" +- 每个聊天项的 title 是多行文本,包含聊天名、未读数、最近消息、时间 +""" + +import logging +import re + +from .ax_bridge import AXBridge + +logger = logging.getLogger("wechat_clicker.wechat_ui") + + +class ChatItem: + """解析后的聊天列表项""" + + def __init__(self, element, name: str, unread_count: int, preview: str, timestamp: str): + self.element = element + self.name = name + self.unread_count = unread_count + self.preview = preview + self.timestamp = timestamp + + def __repr__(self): + return f"ChatItem(name={self.name!r}, unread={self.unread_count})" + + +class MessageItem: + """解析后的消息""" + + def __init__(self, element, msg_type: str, title: str, size: tuple): + self.element = element + self.msg_type = msg_type # "image", "file", "video", "text", "timestamp", "other" + self.title = title + self.size = size + + @property + def is_visible(self) -> bool: + """判断消息是否在可见区域(不可见的消息 size 为 0x0)""" + return self.size[0] > 0 and self.size[1] > 0 + + def __repr__(self): + short = self.title.replace("\n", "\\n")[:40] + return f"MessageItem(type={self.msg_type}, title={short!r})" + + +class WeChatUI: + """微信 UI 导航器""" + + def __init__(self, ax: AXBridge, bundle_id: str = "com.tencent.xinWeChat"): + self.ax = ax + self.bundle_id = bundle_id + self._app_ref = None + + # ---------------------------------------------------------------- + # 应用引用管理 + # ---------------------------------------------------------------- + + def get_app_ref(self): + """获取或缓存微信应用的 AXUIElement 引用。""" + if self._app_ref is None: + self._app_ref = self.ax.get_app_ref(self.bundle_id) + return self._app_ref + + def invalidate_app_ref(self): + """清除缓存的应用引用(微信重启后需要)。""" + self._app_ref = None + + def ensure_wechat_frontmost(self) -> bool: + """确保微信在最前台。""" + return self.ax.bring_to_front(self.bundle_id) + + # ---------------------------------------------------------------- + # 窗口查找 + # ---------------------------------------------------------------- + + def get_all_windows(self) -> list: + """获取微信所有窗口。""" + app_ref = self.get_app_ref() + if app_ref is None: + return [] + return self.ax.get_windows(app_ref) + + def get_main_window(self): + """找到主窗口(名为"微信")。""" + for win in self.get_all_windows(): + title = self.ax.get_title(win) + if title == "微信": + return win + logger.warning("未找到微信主窗口") + return None + + def get_conversation_windows(self) -> list: + """获取所有非主窗口(即会话窗口)。""" + result = [] + for win in self.get_all_windows(): + title = self.ax.get_title(win) + # 排除主窗口和一些系统窗口 + if title and title != "微信": + result.append(win) + return result + + def get_conversation_window(self): + """获取第一个会话窗口(如果存在)。""" + conv_windows = self.get_conversation_windows() + return conv_windows[0] if conv_windows else None + + # ---------------------------------------------------------------- + # 元素查找工具 + # ---------------------------------------------------------------- + + def _find_child_by_role_and_name(self, parent, role: str, name: str = None, desc: str = None): + """在子元素中按 role + name/desc 查找。""" + children = self.ax.get_children(parent) + for child in children: + child_role = self.ax.get_role(child) + if child_role != role: + continue + if name is not None: + child_title = self.ax.get_title(child) + if child_title == name: + return child + # 也检查 AXDescription + child_desc = self.ax.get_description(child) + if child_desc == name: + return child + elif desc is not None: + child_desc = self.ax.get_description(child) + if child_desc == desc: + return child + else: + # 只匹配 role + return child + return None + + def _find_child_recursive(self, parent, role: str, name: str = None, max_depth: int = 6): + """递归查找子元素(广度优先)。""" + if max_depth <= 0: + return None + + children = self.ax.get_children(parent) + # 先在当前层查找 + for child in children: + child_role = self.ax.get_role(child) + if child_role == role: + if name is None: + return child + child_title = self.ax.get_title(child) + if child_title == name: + return child + # 也用 AXDescription 匹配 + child_desc = self.ax.get_description(child) + if child_desc == name: + return child + + # 递归到下一层 + for child in children: + result = self._find_child_recursive(child, role, name, max_depth - 1) + if result is not None: + return result + return None + + # ---------------------------------------------------------------- + # 聊天列表 + # ---------------------------------------------------------------- + + def get_chat_list(self): + """找到聊天列表元素(AXList name="会话")。""" + main_win = self.get_main_window() + if main_win is None: + return None + return self._find_child_recursive(main_win, "AXList", "会话") + + def get_sidebar_chat_button(self): + """找到侧边栏的"微信"按钮(显示全局未读数)。""" + main_win = self.get_main_window() + if main_win is None: + return None + return self._find_child_recursive(main_win, "AXButton", "微信") + + def get_global_unread_count(self) -> int: + """从侧边栏"微信"按钮获取全局未读消息数。""" + btn = self.get_sidebar_chat_button() + if btn is None: + return 0 + desc = self.ax.get_description(btn) + if not desc: + return 0 + match = re.search(r"(\d+)条新消息", desc) + if match: + return int(match.group(1)) + return 0 + + def get_chat_items(self) -> list: + """获取聊天列表中所有可见聊天项,返回 ChatItem 列表。""" + chat_list = self.get_chat_list() + if chat_list is None: + logger.warning("未找到聊天列表") + return [] + + items = [] + children = self.ax.get_children(chat_list) + for child in children: + role = self.ax.get_role(child) + if role != "AXStaticText": + continue + + title = self.ax.get_title(child) + if not title: + continue + + parsed = self._parse_chat_title(title) + if parsed["name"]: + items.append(ChatItem( + element=child, + name=parsed["name"], + unread_count=parsed["unread_count"], + preview=parsed["preview"], + timestamp=parsed["timestamp"], + )) + + return items + + def get_unread_chats(self) -> list: + """获取有未读消息的聊天项,按未读数降序排列。""" + items = self.get_chat_items() + unread = [item for item in items if item.unread_count > 0] + unread.sort(key=lambda x: x.unread_count, reverse=True) + return unread + + # ---------------------------------------------------------------- + # 会话消息 + # ---------------------------------------------------------------- + + def get_message_list(self, conv_window): + """在会话窗口中找到消息列表(AXList name="消息")。""" + if conv_window is None: + return None + return self._find_child_recursive(conv_window, "AXList", "消息") + + def get_messages(self, msg_list) -> list: + """获取消息列表中的所有消息,返回 MessageItem 列表。""" + if msg_list is None: + return [] + + items = [] + children = self.ax.get_children(msg_list) + for child in children: + role = self.ax.get_role(child) + if role != "AXStaticText": + continue + + title = self.ax.get_title(child) + if not title: + continue + + size = self.ax.get_size(child) + msg_type = self._classify_message(title, size) + items.append(MessageItem( + element=child, + msg_type=msg_type, + title=title, + size=size, + )) + + return items + + def get_media_messages(self, conv_window) -> list: + """获取会话中所有可见的图片/文件/视频消息。""" + msg_list = self.get_message_list(conv_window) + if msg_list is None: + return [] + + messages = self.get_messages(msg_list) + media = [ + msg for msg in messages + if msg.msg_type in ("image", "file", "video") and msg.is_visible + ] + return media + + # ---------------------------------------------------------------- + # 窗口操作 + # ---------------------------------------------------------------- + + def get_close_button(self, window): + """找到窗口的关闭按钮(AXButton desc="关闭按钮")。""" + if window is None: + return None + + children = self.ax.get_children(window) + for child in children: + role = self.ax.get_role(child) + if role == "AXButton": + desc = self.ax.get_description(child) + if desc == "关闭按钮": + return child + return None + + def click_sidebar_chat_tab(self) -> bool: + """点击侧边栏"微信"按钮,确保显示聊天列表。""" + btn = self.get_sidebar_chat_button() + if btn is None: + logger.warning("未找到侧边栏微信按钮") + return False + return self.ax.press(btn) + + # ---------------------------------------------------------------- + # 解析工具 + # ---------------------------------------------------------------- + + @staticmethod + def _parse_chat_title(title_text: str) -> dict: + """解析聊天项的多行 title 文本。 + + 格式(有未读时): + 聊天名 + [N条] + 最近消息预览 + 时间 + + 格式(无未读时): + 聊天名 + 最近消息预览 + 时间 + """ + lines = title_text.strip().split("\n") + result = { + "name": "", + "unread_count": 0, + "preview": "", + "timestamp": "", + } + + if not lines: + return result + + result["name"] = lines[0].strip() + + if len(lines) >= 2: + # 检查第二行是否是 [N条] 格式 + unread_match = re.match(r"\[(\d+)条\]", lines[1].strip()) + if unread_match: + result["unread_count"] = int(unread_match.group(1)) + if len(lines) >= 3: + result["preview"] = lines[2].strip() + if len(lines) >= 4: + result["timestamp"] = lines[-1].strip() + else: + result["preview"] = lines[1].strip() + if len(lines) >= 3: + result["timestamp"] = lines[-1].strip() + + return result + + @staticmethod + def _classify_message(title: str, size: tuple) -> str: + """根据消息 title 和尺寸分类消息类型。 + + 已知模式(微信 v4.1.9): + - 图片: title="图片" + - 视频: title 以 "视频" 开头 + - 文件: title 以 "文件\\n" 开头 + - 时间戳: 高度约 41px,内容为时间格式 + - 其他: 文本消息或其他类型 + """ + stripped = title.strip() + + # 图片 + if stripped == "图片": + return "image" + + # 视频 + if stripped.startswith("视频"): + return "video" + + # 文件(多行格式:文件\n文件名\n大小\n微信电脑版) + if stripped.startswith("文件\n") or stripped.startswith("[文件]"): + return "file" + + # 时间戳分隔符(高度约 41px,内容为时间格式) + if size[1] > 0 and size[1] <= 45: + time_pattern = re.match( + r"^(\d{1,2}:\d{2}|\d{1,2}/\d{1,2}|星期[一二三四五六日]|昨天|前天)$", + stripped, + ) + if time_pattern: + return "timestamp" + + return "text"