commit 86a47822adb3b9feca77331b9ae374193267de05 Author: crislee Date: Wed Apr 22 19:28:54 2026 +0800 v0.1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0bcb105 --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# 用户配置(含个人设置) +config.yaml + +# 日志 +*.log +*.log.* + +# Python +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +build/ + +# 虚拟环境 +venv/ +.venv/ + +# macOS +.DS_Store + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Claude Code +.claude/ diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..d411ea4 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,64 @@ +# WeChat 消息自动点击器 + +自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载到本地目录。 + +## 项目背景 + +配合另一个信息收集项目使用:信息收集脚本负责将微信消息入库,但图片和文件需要被点击预览后微信才会下载原始文件到本地。本工具自动完成这个"点击"操作。 + +## 技术架构 + +- **语言**: Python 3.11+ +- **核心技术**: macOS Accessibility API (AXUIElement) via pyobjc +- **目标应用**: 微信桌面端 v4.1.9+ (bundle ID: com.tencent.xinWeChat) +- **运行平台**: macOS Sonoma 14+ + +### 模块结构 + +``` +wechat_clicker/ +├── ax_bridge.py # AXUIElement 底层封装(属性读取、操作执行、键盘事件) +├── wechat_ui.py # 微信 UI 导航(找聊天列表、消息列表、解析 title 分类消息类型) +├── state_machine.py # UI 状态机(基于窗口数量判断状态、状态恢复) +├── automator.py # 主自动化逻辑(扫描→点击→预览→关闭→循环) +├── human_like.py # 拟人行为(高斯分布延迟、长休息、工作时间) +├── config.py # YAML 配置加载 +└── logger_setup.py # 日志配置 +``` + +### 关键设计决策 + +- 微信 v4.1.9 点击聊天会打开**独立窗口**(非页内导航),状态检测基于窗口计数 +- 元素查找使用 role+name 搜索(非硬编码索引),适应 UI 变化 +- 消息类型通过 title 内容判断:`"图片"` → 图片,`"文件\n..."` → 文件 +- 预览通过 Escape 键关闭 + +## 使用方法 + +```bash +# 前置条件 +pip install -r requirements.txt +# 系统设置 > 隐私与安全 > 辅助功能 → 添加终端/Python + +# 复制配置 +cp config.example.yaml config.yaml + +# 运行 +python main.py # 持续运行 +python main.py --once # 单次扫描 +python main.py --dry-run # 只扫描不点击 +python main.py --dump-ui # 输出 UI 元素树 +python main.py --debug # 详细日志 +``` + +## 配置重点 + +- `config.yaml` 中可设置扫描间隔、延迟范围、白/黑名单、工作时间、媒体类型开关 +- 默认不处理视频(`media.click_videos: false`) +- 默认黑名单包含微信系统账号 + +## 注意事项 + +- 微信窗口需要保持可见(不能最小化) +- 运行时会占用微信前台操作 +- 建议在专用电脑上运行 diff --git a/README.md b/README.md new file mode 100644 index 0000000..24ffb20 --- /dev/null +++ b/README.md @@ -0,0 +1,156 @@ +# WeChat 消息自动点击器 + +自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载到本地目录。 + +## 背景 + +微信桌面端不会自动下载图片/文件的原始数据,需要用户手动点击预览后才会触发下载。本工具通过 macOS 辅助功能 API 自动完成这个操作,配合信息收集系统实现群聊数据的自动化入库。 + +## 系统要求 + +- macOS Sonoma 14+ +- Python 3.11+ +- 微信桌面端 v4.1.9+(已登录) + +## 安装 + +### 1. 安装 Python 依赖 + +```bash +cd wechat_msg_clicker +pip install -r requirements.txt +``` + +依赖包: +- `pyobjc-framework-Quartz` — macOS 图形框架绑定 +- `pyobjc-framework-ApplicationServices` — 辅助功能 API 绑定 +- `pyobjc-framework-Cocoa` — macOS 应用框架绑定 +- `PyYAML` — 配置文件解析 + +### 2. 授权辅助功能权限 + +脚本需要 macOS 辅助功能权限才能操作微信 UI 元素: + +1. 打开 **系统设置** > **隐私与安全性** > **辅助功能** +2. 点击左下角 **+** 号 +3. 添加你使用的终端应用(如 Terminal.app 或 iTerm2) +4. 如果通过 Python 直接运行,还需要添加 Python 解释器路径(如 `/Users/你的用户名/miniconda3/bin/python3`) +5. 确保添加后开关为 **开启** 状态 + +> 首次运行时如果未授权,脚本会弹出系统提示框引导你授权。 + +### 3. 准备配置文件 + +```bash +cp config.example.yaml config.yaml +``` + +按需编辑 `config.yaml`,主要配置项: + +| 配置项 | 默认值 | 说明 | +|---|---|---| +| `scan.interval_seconds` | 30 | 扫描间隔(秒) | +| `scan.max_chats_per_scan` | 5 | 每次最多处理几个聊天 | +| `schedule.start_hour` | 8 | 工作开始时间 | +| `schedule.end_hour` | 23 | 工作结束时间 | +| `filter.mode` | all | 过滤模式:all / whitelist / blacklist | +| `filter.blacklist` | 微信系统号 | 排除的聊天名称 | +| `media.click_images` | true | 是否点击图片 | +| `media.click_files` | true | 是否点击文件 | +| `media.click_videos` | false | 是否点击视频(默认关闭) | +| `media.max_media_per_chat` | 20 | 每个聊天最多点击几个媒体 | + +### 4. 确保微信就绪 + +- 微信桌面端已启动并登录 +- 微信窗口保持可见(不能最小化到 Dock) +- 建议在专用电脑上运行,脚本运行时会操作微信前台窗口 + +## 验证流程 + +建议按以下顺序逐步验证,确认每步正常后再进入下一步: + +### Step 1:验证 UI 访问 + +```bash +python main.py --dump-ui +``` + +预期输出:微信的 UI 元素树,包括窗口、按钮、聊天列表等信息。 + +**如果报错**: +- `辅助功能权限未授予` → 检查系统设置中的辅助功能授权 +- `微信未运行` → 确保微信桌面端已启动 +- `未找到微信主窗口` → 确保微信已登录,窗口未最小化 + +### Step 2:试运行扫描 + +```bash +python main.py --dry-run --once --debug +``` + +预期输出:扫描聊天列表,显示有未读消息的聊天和媒体数量,但**不会实际点击**。 + +检查日志确认: +- 能正确识别未读聊天 +- 聊天名称解析正确 +- 未读数量解析正确 + +### Step 3:单次完整执行 + +```bash +python main.py --once --debug +``` + +这会执行一次完整的循环:扫描 → 点击聊天 → 点击图片/文件 → 关闭预览 → 关闭会话。 + +观察过程中: +- 微信窗口是否正常被操作 +- 图片预览是否正常打开和关闭 +- 是否能正确返回聊天列表 + +### Step 4:正式运行 + +```bash +python main.py +``` + +脚本会持续运行,按配置的间隔和工作时间自动处理新消息。 + +后台运行建议使用 `nohup` 或 `tmux`: + +```bash +# 方式一:nohup +nohup python main.py > /dev/null 2>&1 & + +# 方式二:tmux(推荐,方便查看日志) +tmux new -s wechat +python main.py +# Ctrl+B D 分离会话 +# tmux attach -t wechat 重新连接 +``` + +## 命令参考 + +| 命令 | 说明 | +|---|---| +| `python main.py` | 持续运行 | +| `python main.py --once` | 单次扫描后退出 | +| `python main.py --dry-run` | 只扫描不点击 | +| `python main.py --dump-ui` | 输出微信 UI 元素树 | +| `python main.py --debug` | 开启 DEBUG 级别日志 | +| `python main.py --config path/to/config.yaml` | 指定配置文件 | + +参数可组合使用,如 `python main.py --dry-run --once --debug`。 + +## 日志 + +日志同时输出到控制台和文件 `wechat_clicker.log`(可在配置中修改)。 + +日志文件自动轮转,默认最大 10MB,保留 5 个备份。 + +## 注意事项 + +- **封号风险**:工具内置了多种拟人策略(随机延迟、长休息、工作时间限制),但自动化操作始终存在被检测的可能,请自行评估风险 +- **微信更新**:微信版本更新可能改变 UI 结构,导致脚本失效。如遇到问题,先用 `--dump-ui` 检查 UI 变化 +- **前台占用**:脚本运行时会操作微信窗口,不建议同时手动使用微信 diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..b8a4c12 --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,53 @@ +# WeChat 消息自动点击器 配置文件 +# 复制为 config.yaml 后根据需要修改 + +wechat: + bundle_id: "com.tencent.xinWeChat" + process_name: "WeChat" + +# 扫描行为 +scan: + interval_seconds: 30 # 每次扫描间隔(秒) + max_chats_per_scan: 5 # 每次扫描最多处理几个聊天 + scroll_chat_list: false # 是否滚动聊天列表查找更多聊天 + +# 延迟范围(秒),模拟真人操作节奏 +delays: + before_click_chat: [2, 5] # 点击聊天前 + after_open_chat: [1, 3] # 打开会话后等待 + before_click_media: [1, 4] # 点击图片/文件前 + after_click_media: [3, 8] # 点击图片/文件后(等待下载) + before_close_preview: [1, 3] # 关闭预览前 + before_close_chat: [1, 2] # 关闭会话窗口前 + between_messages: [0.5, 2] # 处理每条消息之间 + +# 工作时间(24小时制) +schedule: + enabled: true + start_hour: 8 + end_hour: 23 + pause_on_weekends: false + +# 聊天过滤 +filter: + mode: "all" # "all": 全部, "whitelist": 仅白名单, "blacklist": 排除黑名单 + whitelist: [] # 白名单模式下,仅处理这些聊天 + blacklist: # 黑名单模式下,排除这些聊天 + - "腾讯新闻" + - "微信支付" + - "微信团队" + +# 媒体处理 +media: + click_images: true # 是否点击图片 + click_files: true # 是否点击文件 + click_videos: false # 是否点击视频(视频可能很大,默认关闭) + max_media_per_chat: 20 # 每个聊天最多点击几个媒体 + +# 日志配置 +logging: + level: "INFO" # DEBUG, INFO, WARNING, ERROR + file: "wechat_clicker.log" + max_bytes: 10485760 # 10MB + backup_count: 5 + console: true diff --git a/main.py b/main.py new file mode 100644 index 0000000..dd3dc4a --- /dev/null +++ b/main.py @@ -0,0 +1,74 @@ +"""WeChat 消息自动点击器 — 入口 + +自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载。 +""" + +import argparse +import signal +import sys + +from wechat_clicker.config import Config +from wechat_clicker.automator import WeChatAutomator +from wechat_clicker.logger_setup import setup_logging + + +def main(): + parser = argparse.ArgumentParser( + description="微信消息自动点击器 — 自动点击图片和文件触发下载" + ) + parser.add_argument( + "--config", default="config.yaml", + help="配置文件路径 (默认: config.yaml)" + ) + parser.add_argument( + "--dry-run", action="store_true", + help="试运行模式:只扫描不点击" + ) + parser.add_argument( + "--once", action="store_true", + help="只执行一次扫描循环后退出" + ) + parser.add_argument( + "--debug", action="store_true", + help="开启详细日志 (DEBUG 级别)" + ) + parser.add_argument( + "--dump-ui", action="store_true", + help="输出微信 UI 元素树后退出 (调试用)" + ) + args = parser.parse_args() + + # 加载配置 + config = Config(args.config) + if args.debug: + config.override("logging.level", "DEBUG") + + # 初始化日志 + logger = setup_logging(config) + logger.info("微信自动点击器启动") + + if args.dry_run: + logger.info("*** 试运行模式:只扫描不点击 ***") + + # 优雅退出 + def signal_handler(sig, frame): + logger.info("收到退出信号,正在关闭...") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + # 创建自动化器 + automator = WeChatAutomator(config, dry_run=args.dry_run) + + # 执行 + if args.dump_ui: + automator.dump_ui_tree() + elif args.once: + automator.run_once() + else: + automator.run() + + +if __name__ == "__main__": + main() diff --git a/project.md b/project.md new file mode 100644 index 0000000..4852598 --- /dev/null +++ b/project.md @@ -0,0 +1,67 @@ +# 项目规划:WeChat 消息自动点击器 + +## 项目目标 + +创建一个长期稳定运行的自动化工具,自动点击微信桌面端中未读消息的图片和文件,触发微信下载原始文件到本地目录,配合信息收集系统完成数据入库流程。 + +## 项目状态 + +### 已完成 (v0.1.0 - 2026/04/22) + +- [x] 项目结构搭建 +- [x] 配置系统(YAML 配置文件、默认值、白/黑名单) +- [x] AXUIElement 底层封装(ax_bridge.py) +- [x] 微信 UI 导航(wechat_ui.py)— 聊天列表解析、消息类型分类 +- [x] UI 状态机(state_machine.py)— 基于窗口计数的状态检测与恢复 +- [x] 拟人行为引擎(human_like.py)— 高斯分布延迟、长休息、工作时间 +- [x] 主自动化逻辑(automator.py)— 完整扫描-点击-关闭循环 +- [x] 入口脚本(main.py)— 参数解析、信号处理 +- [x] 调试工具(--dump-ui, --dry-run) + +### 待验证 + +- [ ] 在真实环境中测试 AXUIElement 对微信的访问能力 +- [ ] 验证聊天列表 title 解析的准确性 +- [ ] 验证图片点击后预览关闭的可靠性 +- [ ] 长时间运行稳定性测试 + +### 未来可能的改进 + +- [ ] 聊天列表滚动支持(处理不在可见区域的聊天) +- [ ] 消息滚动支持(处理更早的图片消息) +- [ ] 已处理消息去重(记录已点击的媒体,避免重复) +- [ ] 微信版本适配(检测 UI 结构变化并自动调整) +- [ ] 运行状态 Web 面板(远程监控) +- [ ] 与信息收集系统直接集成 + +## 技术细节 + +### 微信 UI 结构 (v4.1.9) + +- 主窗口 "微信" 包含侧边栏 + 聊天列表 +- 聊天列表:`AXList name="会话"`,子元素为 `AXStaticText` +- 点击聊天项打开独立会话窗口 +- 消息列表:`AXList name="消息"` +- 图片消息 title = "图片",文件消息 title 以 "文件\n" 开头 + +### 防封策略 + +- 高斯分布随机延迟(非均匀) +- ±20% 扫描间隔抖动 +- 5% 概率长休息(30-120 秒) +- 工作时间限制 +- 每次最多 5 个聊天 / 每聊天最多 20 个媒体 + +### 依赖 + +- pyobjc-framework-Quartz +- pyobjc-framework-ApplicationServices +- pyobjc-framework-Cocoa +- PyYAML + +### 系统要求 + +- macOS Sonoma 14+ +- Python 3.11+ +- 辅助功能权限(终端/Python 需要授权) +- 微信桌面端 v4.1.9+ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..79d3e5f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +pyobjc-framework-Quartz>=10.0 +pyobjc-framework-ApplicationServices>=10.0 +pyobjc-framework-Cocoa>=10.0 +PyYAML>=6.0 diff --git a/wechat_clicker/__init__.py b/wechat_clicker/__init__.py new file mode 100644 index 0000000..6927735 --- /dev/null +++ b/wechat_clicker/__init__.py @@ -0,0 +1 @@ +"""WeChat 消息自动点击器""" diff --git a/wechat_clicker/automator.py b/wechat_clicker/automator.py new file mode 100644 index 0000000..2dcf824 --- /dev/null +++ b/wechat_clicker/automator.py @@ -0,0 +1,334 @@ +"""主自动化逻辑 + +编排整个工作流程:扫描未读聊天 → 点击进入 → 点击图片/文件 → 关闭预览 → 关闭会话 → 循环 +""" + +import logging +import time + +from .ax_bridge import AXBridge +from .config import Config +from .human_like import HumanBehavior +from .state_machine import StateMachine, UIState +from .wechat_ui import WeChatUI + +logger = logging.getLogger("wechat_clicker.automator") + + +class WeChatAutomator: + """微信消息自动点击器""" + + def __init__(self, config: Config, dry_run: bool = False): + self.config = config + self.dry_run = dry_run + + # 初始化各组件 + self.ax = AXBridge() + self.ui = WeChatUI(self.ax, config.bundle_id) + self.state = StateMachine(self.ax, self.ui) + self.human = HumanBehavior(config) + + # 统计 + self._scan_count = 0 + self._total_chats_processed = 0 + self._total_media_clicked = 0 + + # ---------------------------------------------------------------- + # 启动检查 + # ---------------------------------------------------------------- + + def verify_setup(self) -> bool: + """验证环境和权限。""" + # 检查辅助功能权限 + if not self.ax.check_accessibility(): + logger.error("缺少辅助功能权限,无法继续") + return False + + # 检查微信是否运行 + app_ref = self.ui.get_app_ref() + if app_ref is None: + logger.error("微信未运行,请先启动微信桌面端") + return False + + # 检查主窗口 + main_win = self.ui.get_main_window() + if main_win is None: + logger.error("未找到微信主窗口,请确保微信已登录并可见") + return False + + logger.info("环境检查通过") + return True + + # ---------------------------------------------------------------- + # 主循环 + # ---------------------------------------------------------------- + + def run(self): + """永久运行的主循环。""" + logger.info("微信自动点击器启动") + + if not self.verify_setup(): + return + + while True: + try: + self._run_one_cycle() + except KeyboardInterrupt: + logger.info("用户中断,正在退出...") + break + except Exception as e: + logger.error(f"主循环异常: {e}", exc_info=True) + # 尝试恢复状态 + try: + self.state.recover_to_chat_list() + except Exception: + pass + time.sleep(10) + + self._print_stats() + + def run_once(self): + """运行一次扫描循环。""" + logger.info("执行单次扫描") + + if not self.verify_setup(): + return + + self._run_one_cycle() + self._print_stats() + + def _run_one_cycle(self): + """执行一个完整的扫描-处理循环。""" + # 检查工作时间 + if self.human.is_off_hours(): + logger.info("当前为非工作时间,等待中...") + time.sleep(300) + return + + # 偶尔触发长休息 + if self.human.should_take_break(): + self.human.long_break() + return + + # 确保微信在前台 + self.ui.ensure_wechat_frontmost() + time.sleep(0.5) + + # 恢复到聊天列表 + if not self.state.recover_to_chat_list(): + logger.warning("无法恢复到聊天列表,跳过本次循环") + time.sleep(10) + return + + self._scan_count += 1 + + # 检查是否有未读消息 + global_unread = self.ui.get_global_unread_count() + if global_unread == 0: + logger.debug(f"[扫描#{self._scan_count}] 没有未读消息") + sleep_time = self.human.scan_interval_with_jitter() + time.sleep(sleep_time) + return + + logger.info(f"[扫描#{self._scan_count}] 全局未读: {global_unread}") + + # 获取未读聊天列表 + unread_chats = self.ui.get_unread_chats() + if not unread_chats: + logger.debug("聊天列表中未发现未读项(可能需要滚动)") + sleep_time = self.human.scan_interval_with_jitter() + time.sleep(sleep_time) + return + + # 过滤和限制数量 + chats_to_process = [] + for chat in unread_chats: + if self.config.should_process_chat(chat.name): + chats_to_process.append(chat) + + count = self.human.random_subset_count( + len(chats_to_process), self.config.max_chats_per_scan + ) + chats_to_process = chats_to_process[:count] + + logger.info( + f"将处理 {len(chats_to_process)} 个聊天 " + f"(共 {len(unread_chats)} 个未读)" + ) + + # 逐个处理 + for chat in chats_to_process: + self.human.delay("before_click_chat") + self._process_chat(chat) + self.human.delay("before_close_chat") + + # 等待下次扫描 + sleep_time = self.human.scan_interval_with_jitter() + logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描") + time.sleep(sleep_time) + + # ---------------------------------------------------------------- + # 处理单个聊天 + # ---------------------------------------------------------------- + + def _process_chat(self, chat): + """打开一个聊天,处理其中的媒体消息,然后关闭。""" + logger.info(f"处理聊天: {chat.name} (未读: {chat.unread_count})") + + if self.dry_run: + logger.info(f" [DRY-RUN] 跳过点击: {chat.name}") + return + + # 点击聊天项打开会话 + if not self.ax.press(chat.element): + logger.warning(f" 点击聊天项失败: {chat.name}") + return + + self.human.delay("after_open_chat") + + # 验证会话窗口已打开 + state = self.state.detect_state() + if state != UIState.CONVERSATION_OPEN: + logger.warning( + f" 会话窗口未打开 (状态: {state.value}),尝试恢复" + ) + self.state.recover_to_chat_list() + return + + # 处理会话中的媒体 + conv_window = self.ui.get_conversation_window() + if conv_window: + media_count = self._process_media(conv_window, chat.name) + self._total_media_clicked += media_count + + self._total_chats_processed += 1 + + # 关闭会话窗口 + self.human.delay("before_close_chat") + if not self.state.close_current_conversation(): + logger.warning(" 关闭会话失败,尝试恢复") + self.state.recover_to_chat_list() + + # ---------------------------------------------------------------- + # 处理媒体消息 + # ---------------------------------------------------------------- + + def _process_media(self, conv_window, chat_name: str) -> int: + """在打开的会话中查找并点击媒体消息。 + + Returns: + 点击的媒体数量。 + """ + media_messages = self.ui.get_media_messages(conv_window) + if not media_messages: + logger.debug(f" {chat_name}: 未发现可见媒体消息") + return 0 + + # 过滤需要点击的类型 + targets = [] + for msg in media_messages: + if msg.msg_type == "image" and self.config.click_images: + targets.append(msg) + elif msg.msg_type == "file" and self.config.click_files: + targets.append(msg) + elif msg.msg_type == "video" and self.config.click_videos: + targets.append(msg) + + if not targets: + logger.debug(f" {chat_name}: 无需处理的媒体") + return 0 + + # 限制数量,从最新的开始(列表末尾 = 最新) + max_count = self.config.max_media_per_chat + targets = targets[-max_count:] if len(targets) > max_count else targets + # 反转,从最新的开始处理 + targets = list(reversed(targets)) + + logger.info(f" {chat_name}: 发现 {len(targets)} 个媒体消息待处理") + + clicked = 0 + for msg in targets: + self.human.delay("before_click_media") + + short_title = msg.title.replace("\n", " ")[:50] + logger.info(f" 点击{msg.msg_type}: {short_title}") + + # 点击媒体 + if not self.ax.press(msg.element): + logger.warning(f" 点击失败: {short_title}") + continue + + self.human.delay("after_click_media") + + # 关闭可能出现的预览 + self._dismiss_preview_safe() + + clicked += 1 + self.human.delay("between_messages") + + # 检查连续错误 + if self.ax.should_backoff(): + logger.warning("连续错误过多,暂停处理") + self.ax.reset_error_count() + break + + return clicked + + def _dismiss_preview_safe(self): + """安全地关闭可能出现的媒体预览。""" + # 等一小会儿让预览可能出现 + self.human.micro_jitter() + + state = self.state.detect_state() + if state == UIState.MEDIA_PREVIEW: + self.human.delay("before_close_preview") + self.state.dismiss_preview() + else: + # 保守策略:即使未检测到预览也发送 Escape + # 因为某些预览可能不会创建新窗口 + self.ax.send_escape_key() + time.sleep(0.3) + + # ---------------------------------------------------------------- + # 调试工具 + # ---------------------------------------------------------------- + + def dump_ui_tree(self): + """输出微信 UI 元素树(调试用)。""" + if not self.verify_setup(): + return + + self.ui.ensure_wechat_frontmost() + time.sleep(0.5) + + # 输出主窗口 + main_win = self.ui.get_main_window() + if main_win: + print("=== 主窗口 (微信) ===") + print(self.ax.dump_element(main_win, max_depth=5)) + + # 输出会话窗口 + conv_windows = self.ui.get_conversation_windows() + for win in conv_windows: + title = self.ax.get_title(win) + print(f"\n=== 会话窗口 ({title}) ===") + print(self.ax.dump_element(win, max_depth=5)) + + # 输出聊天列表解析结果 + print("\n=== 聊天列表解析 ===") + items = self.ui.get_chat_items() + for item in items: + status = f"[未读:{item.unread_count}]" if item.unread_count > 0 else "" + print(f" {item.name} {status} | {item.preview} | {item.timestamp}") + + # ---------------------------------------------------------------- + # 统计 + # ---------------------------------------------------------------- + + def _print_stats(self): + """输出运行统计。""" + logger.info( + f"运行统计: 扫描={self._scan_count}, " + f"处理聊天={self._total_chats_processed}, " + f"点击媒体={self._total_media_clicked}" + ) diff --git a/wechat_clicker/ax_bridge.py b/wechat_clicker/ax_bridge.py new file mode 100644 index 0000000..eb9f42b --- /dev/null +++ b/wechat_clicker/ax_bridge.py @@ -0,0 +1,300 @@ +"""macOS Accessibility API (AXUIElement) 底层封装 + +通过 pyobjc 调用 ApplicationServices 框架,提供对 UI 元素的读取和操作能力。 +""" + +import logging +import time + +from ApplicationServices import ( + AXIsProcessTrusted, + AXIsProcessTrustedWithOptions, + AXUIElementCreateApplication, + AXUIElementCopyAttributeValue, + AXUIElementCopyAttributeNames, + AXUIElementCopyActionNames, + AXUIElementPerformAction, +) +from Cocoa import ( + NSRunningApplication, + NSWorkspace, + NSApplicationActivateIgnoringOtherApps, +) +from Quartz import ( + CGEventCreateKeyboardEvent, + CGEventPost, + CGEventSetFlags, + kCGHIDEventTap, + kCGEventFlagMaskCommand, +) +from CoreFoundation import kCFBooleanTrue + +logger = logging.getLogger("wechat_clicker.ax_bridge") + +# AXError 常量 +kAXErrorSuccess = 0 +kAXErrorAttributeUnsupported = -25205 +kAXErrorNoValue = -25212 +kAXErrorInvalidUIElement = -25202 + +# 键码 +kVK_Escape = 0x35 +kVK_W = 0x0D + + +class AXBridge: + """AXUIElement 底层封装""" + + def __init__(self): + self._error_count = 0 + self._max_errors_before_backoff = 10 + self._backoff_seconds = 60 + + # ---------------------------------------------------------------- + # 辅助功能权限检查 + # ---------------------------------------------------------------- + + def check_accessibility(self) -> bool: + """检查当前进程是否拥有辅助功能权限。""" + trusted = AXIsProcessTrusted() + if not trusted: + logger.error( + "辅助功能权限未授予!请前往:系统设置 > 隐私与安全 > 辅助功能," + "添加当前终端应用或 Python 解释器。" + ) + # 弹出系统提示框 + AXIsProcessTrustedWithOptions( + { + "AXTrustedCheckOptionPrompt": kCFBooleanTrue + } + ) + return trusted + + # ---------------------------------------------------------------- + # 获取应用引用 + # ---------------------------------------------------------------- + + def get_app_ref(self, bundle_id: str): + """根据 bundle ID 获取应用的 AXUIElement 引用。返回 None 表示应用未运行。""" + apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id) + if not apps or len(apps) == 0: + logger.error(f"未找到运行中的应用: {bundle_id}") + return None + app = apps[0] + pid = app.processIdentifier() + logger.debug(f"找到应用 {bundle_id}, PID={pid}") + return AXUIElementCreateApplication(pid) + + def bring_to_front(self, bundle_id: str) -> bool: + """将指定应用带到最前台。""" + apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id) + if not apps or len(apps) == 0: + return False + app = apps[0] + return app.activateWithOptions_(NSApplicationActivateIgnoringOtherApps) + + # ---------------------------------------------------------------- + # 属性读取 + # ---------------------------------------------------------------- + + def get_attribute(self, element, attr_name: str): + """读取 UI 元素的指定属性。失败返回 None。""" + try: + err, value = AXUIElementCopyAttributeValue(element, attr_name, None) + if err == kAXErrorSuccess: + self._error_count = 0 + return value + if err not in (kAXErrorNoValue, kAXErrorAttributeUnsupported): + logger.debug(f"读取属性 {attr_name} 失败, 错误码: {err}") + return None + except Exception as e: + logger.debug(f"读取属性 {attr_name} 异常: {e}") + return None + + def get_attribute_names(self, element) -> list: + """获取元素支持的所有属性名。""" + try: + err, names = AXUIElementCopyAttributeNames(element, None) + if err == kAXErrorSuccess: + return list(names) if names else [] + return [] + except Exception: + return [] + + def get_action_names(self, element) -> list: + """获取元素支持的所有操作名。""" + try: + err, names = AXUIElementCopyActionNames(element, None) + if err == kAXErrorSuccess: + return list(names) if names else [] + return [] + except Exception: + return [] + + def get_children(self, element) -> list: + """获取子元素列表。""" + children = self.get_attribute(element, "AXChildren") + if children is None: + return [] + return list(children) + + def get_role(self, element) -> str: + """获取元素角色(AXRole)。""" + return self.get_attribute(element, "AXRole") or "" + + def get_title(self, element) -> str: + """获取元素标题(AXTitle)。""" + return self.get_attribute(element, "AXTitle") or "" + + def get_value(self, element) -> str: + """获取元素值(AXValue)。""" + return self.get_attribute(element, "AXValue") or "" + + def get_name(self, element) -> str: + """获取元素名称,优先 AXTitle,其次 AXValue。""" + title = self.get_attribute(element, "AXTitle") + if title: + return title + value = self.get_attribute(element, "AXValue") + if value: + return value + return "" + + def get_description(self, element) -> str: + """获取元素描述(AXDescription)。""" + return self.get_attribute(element, "AXDescription") or "" + + def get_size(self, element) -> tuple: + """获取元素尺寸 (width, height)。返回 (0, 0) 表示不可见或失败。""" + size = self.get_attribute(element, "AXSize") + if size is None: + return (0, 0) + try: + return (int(size.width), int(size.height)) + except (AttributeError, TypeError): + return (0, 0) + + def get_position(self, element) -> tuple: + """获取元素位置 (x, y)。""" + pos = self.get_attribute(element, "AXPosition") + if pos is None: + return (0, 0) + try: + return (int(pos.x), int(pos.y)) + except (AttributeError, TypeError): + return (0, 0) + + def get_windows(self, app_ref) -> list: + """获取应用的所有窗口。""" + windows = self.get_attribute(app_ref, "AXWindows") + if windows is None: + return [] + return list(windows) + + def get_focused_window(self, app_ref): + """获取当前聚焦的窗口。""" + return self.get_attribute(app_ref, "AXFocusedWindow") + + # ---------------------------------------------------------------- + # 操作执行 + # ---------------------------------------------------------------- + + def perform_action(self, element, action: str) -> bool: + """对元素执行指定操作。""" + try: + err = AXUIElementPerformAction(element, action) + if err == kAXErrorSuccess: + self._error_count = 0 + return True + logger.debug(f"执行操作 {action} 失败, 错误码: {err}") + self._error_count += 1 + return False + except Exception as e: + logger.debug(f"执行操作 {action} 异常: {e}") + self._error_count += 1 + return False + + def press(self, element) -> bool: + """对元素执行 AXPress(等效点击)。""" + return self.perform_action(element, "AXPress") + + # ---------------------------------------------------------------- + # 键盘事件 + # ---------------------------------------------------------------- + + def send_escape_key(self): + """发送 Escape 键事件。""" + self._send_key(kVK_Escape) + + def send_cmd_w(self): + """发送 Cmd+W 键事件(关闭窗口)。""" + self._send_key(kVK_W, flags=kCGEventFlagMaskCommand) + + def _send_key(self, key_code: int, flags: int = 0): + """发送键盘事件(按下+抬起)。""" + try: + # Key down + event_down = CGEventCreateKeyboardEvent(None, key_code, True) + if flags: + CGEventSetFlags(event_down, flags) + CGEventPost(kCGHIDEventTap, event_down) + + time.sleep(0.05) + + # Key up + event_up = CGEventCreateKeyboardEvent(None, key_code, False) + if flags: + CGEventSetFlags(event_up, flags) + CGEventPost(kCGHIDEventTap, event_up) + except Exception as e: + logger.error(f"发送键盘事件失败: {e}") + + # ---------------------------------------------------------------- + # 错误管理 + # ---------------------------------------------------------------- + + def should_backoff(self) -> bool: + """连续错误过多时应暂停一段时间。""" + return self._error_count >= self._max_errors_before_backoff + + def reset_error_count(self): + """重置错误计数。""" + self._error_count = 0 + + # ---------------------------------------------------------------- + # 调试工具 + # ---------------------------------------------------------------- + + def dump_element(self, element, indent: int = 0, max_depth: int = 5) -> str: + """递归输出元素树(调试用)。""" + if indent >= max_depth: + return "" + + lines = [] + role = self.get_role(element) + title = self.get_title(element) + name_attr = self.get_attribute(element, "AXValue") or "" + desc = self.get_description(element) + size = self.get_size(element) + + prefix = " " * indent + info_parts = [f"role={role}"] + if title: + # 截断过长的 title,用单行表示 + short_title = title.replace("\n", "\\n")[:80] + info_parts.append(f'title="{short_title}"') + if name_attr: + short_name = str(name_attr).replace("\n", "\\n")[:80] + info_parts.append(f'value="{short_name}"') + if desc: + info_parts.append(f'desc="{desc}"') + if size != (0, 0): + info_parts.append(f"size={size[0]}x{size[1]}") + + lines.append(f"{prefix}[{', '.join(info_parts)}]") + + children = self.get_children(element) + for child in children: + lines.append(self.dump_element(child, indent + 1, max_depth)) + + return "\n".join(lines) diff --git a/wechat_clicker/config.py b/wechat_clicker/config.py new file mode 100644 index 0000000..69b9271 --- /dev/null +++ b/wechat_clicker/config.py @@ -0,0 +1,201 @@ +"""配置加载与管理""" + +import os +import random +import yaml +from datetime import datetime + + +# 默认配置 +DEFAULTS = { + "wechat": { + "bundle_id": "com.tencent.xinWeChat", + "process_name": "WeChat", + }, + "scan": { + "interval_seconds": 30, + "max_chats_per_scan": 5, + "scroll_chat_list": False, + }, + "delays": { + "before_click_chat": [2, 5], + "after_open_chat": [1, 3], + "before_click_media": [1, 4], + "after_click_media": [3, 8], + "before_close_preview": [1, 3], + "before_close_chat": [1, 2], + "between_messages": [0.5, 2], + }, + "schedule": { + "enabled": True, + "start_hour": 8, + "end_hour": 23, + "pause_on_weekends": False, + }, + "filter": { + "mode": "all", + "whitelist": [], + "blacklist": ["腾讯新闻", "微信支付", "微信团队"], + }, + "media": { + "click_images": True, + "click_files": True, + "click_videos": False, + "max_media_per_chat": 20, + }, + "logging": { + "level": "INFO", + "file": "wechat_clicker.log", + "max_bytes": 10485760, + "backup_count": 5, + "console": True, + }, +} + + +def _deep_merge(base: dict, override: dict) -> dict: + """深度合并两个字典,override 中的值覆盖 base 中的值。""" + result = base.copy() + for key, value in override.items(): + if key in result and isinstance(result[key], dict) and isinstance(value, dict): + result[key] = _deep_merge(result[key], value) + else: + result[key] = value + return result + + +class Config: + """配置管理器""" + + def __init__(self, config_path: str = "config.yaml"): + self._config_path = config_path + self._data = self._load() + + def _load(self) -> dict: + """加载配置文件,与默认值合并。""" + if os.path.exists(self._config_path): + with open(self._config_path, "r", encoding="utf-8") as f: + user_config = yaml.safe_load(f) or {} + return _deep_merge(DEFAULTS, user_config) + return DEFAULTS.copy() + + def _get(self, dotted_key: str, default=None): + """通过点号分隔的 key 获取配置值。例如 'scan.interval_seconds'""" + keys = dotted_key.split(".") + value = self._data + for key in keys: + if isinstance(value, dict) and key in value: + value = value[key] + else: + return default + return value + + def override(self, dotted_key: str, value): + """运行时覆盖某个配置值。""" + keys = dotted_key.split(".") + target = self._data + for key in keys[:-1]: + target = target.setdefault(key, {}) + target[keys[-1]] = value + + # --- 便捷属性 --- + + @property + def bundle_id(self) -> str: + return self._get("wechat.bundle_id") + + @property + def process_name(self) -> str: + return self._get("wechat.process_name") + + @property + def scan_interval(self) -> int: + return self._get("scan.interval_seconds") + + @property + def max_chats_per_scan(self) -> int: + return self._get("scan.max_chats_per_scan") + + @property + def scroll_chat_list(self) -> bool: + return self._get("scan.scroll_chat_list") + + @property + def click_images(self) -> bool: + return self._get("media.click_images") + + @property + def click_files(self) -> bool: + return self._get("media.click_files") + + @property + def click_videos(self) -> bool: + return self._get("media.click_videos") + + @property + def max_media_per_chat(self) -> int: + return self._get("media.max_media_per_chat") + + @property + def log_level(self) -> str: + return self._get("logging.level") + + @property + def log_file(self) -> str: + return self._get("logging.file") + + @property + def log_max_bytes(self) -> int: + return self._get("logging.max_bytes") + + @property + def log_backup_count(self) -> int: + return self._get("logging.backup_count") + + @property + def log_console(self) -> bool: + return self._get("logging.console") + + # --- 延迟 --- + + def get_delay(self, delay_name: str) -> float: + """获取指定名称的随机延迟时间(秒),使用截断正态分布。""" + delay_range = self._get(f"delays.{delay_name}", [1, 3]) + min_val, max_val = delay_range[0], delay_range[1] + mid = (min_val + max_val) / 2 + std = (max_val - min_val) / 4 # 大部分值落在范围内 + value = random.gauss(mid, std) + return max(min_val, min(max_val, value)) + + # --- 工作时间 --- + + def is_within_working_hours(self) -> bool: + """检查当前时间是否在配置的工作时间内。""" + if not self._get("schedule.enabled"): + return True + + now = datetime.now() + # 周末检查 + if self._get("schedule.pause_on_weekends") and now.weekday() >= 5: + return False + + start_hour = self._get("schedule.start_hour") + end_hour = self._get("schedule.end_hour") + return start_hour <= now.hour < end_hour + + # --- 聊天过滤 --- + + def should_process_chat(self, chat_name: str) -> bool: + """根据白名单/黑名单判断是否处理此聊天。""" + mode = self._get("filter.mode", "all") + if mode == "all": + # 即使在 all 模式下也检查黑名单 + blacklist = self._get("filter.blacklist", []) + return chat_name not in blacklist + elif mode == "whitelist": + whitelist = self._get("filter.whitelist", []) + return chat_name in whitelist + elif mode == "blacklist": + blacklist = self._get("filter.blacklist", []) + return chat_name not in blacklist + return True diff --git a/wechat_clicker/human_like.py b/wechat_clicker/human_like.py new file mode 100644 index 0000000..36849b7 --- /dev/null +++ b/wechat_clicker/human_like.py @@ -0,0 +1,91 @@ +"""拟人行为模拟 + +使用高斯分布随机延迟、偶尔长休息、工作时间限制等策略, +模拟真人操作节奏,降低被检测风险。 +""" + +import logging +import random +import time + +from .config import Config + +logger = logging.getLogger("wechat_clicker.human_like") + + +class HumanBehavior: + """拟人行为引擎""" + + def __init__(self, config: Config): + self.config = config + self._cycle_count = 0 + self._break_probability = 0.05 # 5% 概率触发长休息 + self._break_min = 30 + self._break_max = 120 + + def delay(self, delay_name: str): + """执行指定名称的随机延迟。""" + seconds = self.config.get_delay(delay_name) + logger.debug(f"延迟 {delay_name}: {seconds:.1f}s") + time.sleep(seconds) + + def random_delay(self, min_s: float, max_s: float): + """执行指定范围内的随机延迟(高斯分布)。""" + mid = (min_s + max_s) / 2 + std = (max_s - min_s) / 4 + value = random.gauss(mid, std) + value = max(min_s, min(max_s, value)) + time.sleep(value) + + def micro_jitter(self): + """极短延迟(0.1-0.5 秒),用于连续操作之间。""" + time.sleep(random.uniform(0.1, 0.5)) + + def scan_interval_with_jitter(self) -> float: + """返回带 ±20% 抖动的扫描间隔时间(秒)。""" + base = self.config.scan_interval + jitter = base * 0.2 + return random.uniform(base - jitter, base + jitter) + + # ---------------------------------------------------------------- + # 长休息 + # ---------------------------------------------------------------- + + def should_take_break(self) -> bool: + """判断是否应该触发一次长休息。 + + 每个循环有固定概率触发,模拟真人偶尔离开电脑。 + """ + self._cycle_count += 1 + return random.random() < self._break_probability + + def long_break(self): + """执行一次长休息(30-120 秒)。""" + duration = random.uniform(self._break_min, self._break_max) + logger.info(f"模拟长休息: {duration:.0f}s") + time.sleep(duration) + + # ---------------------------------------------------------------- + # 工作时间 + # ---------------------------------------------------------------- + + def is_off_hours(self) -> bool: + """检查是否在非工作时间(应暂停操作)。""" + return not self.config.is_within_working_hours() + + # ---------------------------------------------------------------- + # 随机子集 + # ---------------------------------------------------------------- + + def random_subset_count(self, total: int, max_count: int) -> int: + """决定本次处理多少个项目。 + + 不总是处理 max_count 个,偶尔少处理一些,模拟真人不会每次都处理所有内容。 + """ + if total <= 0: + return 0 + upper = min(total, max_count) + # 80% 概率处理全部(上限内),20% 概率减少 1-2 个 + if random.random() < 0.8: + return upper + return max(1, upper - random.randint(1, 2)) diff --git a/wechat_clicker/logger_setup.py b/wechat_clicker/logger_setup.py new file mode 100644 index 0000000..01638bc --- /dev/null +++ b/wechat_clicker/logger_setup.py @@ -0,0 +1,40 @@ +"""日志配置""" + +import logging +from logging.handlers import RotatingFileHandler + + +def setup_logging(config) -> logging.Logger: + """配置日志系统,返回 logger 实例。""" + logger = logging.getLogger("wechat_clicker") + logger.setLevel(getattr(logging, config.log_level, logging.INFO)) + + # 避免重复添加 handler + if logger.handlers: + return logger + + formatter = logging.Formatter( + "%(asctime)s [%(levelname)s] %(name)s: %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + + # 文件 handler(自动轮转) + file_handler = RotatingFileHandler( + config.log_file, + maxBytes=config.log_max_bytes, + backupCount=config.log_backup_count, + encoding="utf-8", + ) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + # 控制台 handler + if config.log_console: + console_handler = logging.StreamHandler() + console_handler.setFormatter(logging.Formatter( + "%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", + )) + logger.addHandler(console_handler) + + return logger diff --git a/wechat_clicker/state_machine.py b/wechat_clicker/state_machine.py new file mode 100644 index 0000000..1ad74ad --- /dev/null +++ b/wechat_clicker/state_machine.py @@ -0,0 +1,201 @@ +"""UI 状态机 + +基于微信窗口数量和名称判断当前 UI 状态,提供状态恢复能力。 + +微信 v4.1.9 窗口模型: +- 1 个窗口("微信"):主聊天列表 +- 2 个窗口("微信" + 聊天名):会话已打开 +- 3+ 个窗口:可能有图片预览等额外窗口 +""" + +import logging +import time +from enum import Enum + +from .ax_bridge import AXBridge +from .wechat_ui import WeChatUI + +logger = logging.getLogger("wechat_clicker.state_machine") + + +class UIState(Enum): + UNKNOWN = "unknown" + MAIN_CHAT_LIST = "main_chat_list" + CONVERSATION_OPEN = "conversation_open" + MEDIA_PREVIEW = "media_preview" + WECHAT_NOT_RUNNING = "wechat_not_running" + + +class StateMachine: + """微信 UI 状态机""" + + def __init__(self, ax: AXBridge, wechat_ui: WeChatUI): + self.ax = ax + self.ui = wechat_ui + self._current_state = UIState.UNKNOWN + self._conversation_name = None # 当前打开的会话名称 + + @property + def current_state(self) -> UIState: + return self._current_state + + @property + def conversation_name(self) -> str: + return self._conversation_name or "" + + # ---------------------------------------------------------------- + # 状态检测 + # ---------------------------------------------------------------- + + def detect_state(self) -> UIState: + """检测当前微信 UI 状态。""" + windows = self.ui.get_all_windows() + + if not windows: + self._current_state = UIState.WECHAT_NOT_RUNNING + self._conversation_name = None + logger.debug("未检测到微信窗口") + return self._current_state + + # 分析窗口 + main_window = None + other_windows = [] + for win in windows: + title = self.ax.get_title(win) + if title == "微信": + main_window = win + elif title: + other_windows.append((win, title)) + + if main_window is None: + # 微信可能最小化了,或者窗口结构变化 + self._current_state = UIState.UNKNOWN + logger.debug("未找到微信主窗口") + return self._current_state + + window_count = len(windows) + + if window_count == 1: + # 只有主窗口 = 聊天列表 + self._current_state = UIState.MAIN_CHAT_LIST + self._conversation_name = None + elif window_count == 2 and len(other_windows) == 1: + # 主窗口 + 一个会话窗口 + self._current_state = UIState.CONVERSATION_OPEN + self._conversation_name = other_windows[0][1] + elif window_count >= 3: + # 可能有预览窗口 + self._current_state = UIState.MEDIA_PREVIEW + # 第一个非主窗口通常是会话 + if other_windows: + self._conversation_name = other_windows[0][1] + else: + self._current_state = UIState.UNKNOWN + + logger.debug( + f"状态检测: {self._current_state.value}, " + f"窗口数={window_count}, 会话={self._conversation_name}" + ) + return self._current_state + + # ---------------------------------------------------------------- + # 状态恢复 + # ---------------------------------------------------------------- + + def recover_to_chat_list(self, max_retries: int = 3) -> bool: + """恢复到主聊天列表状态。关闭所有会话和预览窗口。 + + Returns: + True 如果成功恢复到聊天列表。 + """ + for attempt in range(max_retries): + state = self.detect_state() + + if state == UIState.MAIN_CHAT_LIST: + return True + + if state == UIState.WECHAT_NOT_RUNNING: + logger.error("微信未运行,无法恢复") + return False + + if state == UIState.MEDIA_PREVIEW: + # 先关闭预览(Escape) + logger.info("关闭媒体预览...") + self.ax.send_escape_key() + time.sleep(0.5) + + if state in (UIState.CONVERSATION_OPEN, UIState.MEDIA_PREVIEW): + # 关闭会话窗口 + self._close_all_conversations() + time.sleep(0.5) + + if state == UIState.UNKNOWN: + # 尝试激活微信并点击侧边栏聊天按钮 + self.ui.ensure_wechat_frontmost() + time.sleep(0.5) + self.ui.click_sidebar_chat_tab() + time.sleep(0.5) + + logger.debug(f"恢复尝试 {attempt + 1}/{max_retries}") + + # 最终检查 + final_state = self.detect_state() + if final_state == UIState.MAIN_CHAT_LIST: + return True + + logger.error(f"恢复失败,当前状态: {final_state.value}") + return False + + def _close_all_conversations(self): + """关闭所有会话窗口。""" + conv_windows = self.ui.get_conversation_windows() + for win in conv_windows: + title = self.ax.get_title(win) + close_btn = self.ui.get_close_button(win) + if close_btn: + logger.debug(f"关闭会话窗口: {title}") + self.ax.press(close_btn) + time.sleep(0.3) + else: + # 后备方案:Cmd+W + logger.debug(f"使用 Cmd+W 关闭窗口: {title}") + self.ax.send_cmd_w() + time.sleep(0.3) + + def close_current_conversation(self) -> bool: + """关闭当前打开的会话窗口。""" + conv_window = self.ui.get_conversation_window() + if conv_window is None: + return True # 没有会话窗口,视为成功 + + close_btn = self.ui.get_close_button(conv_window) + if close_btn: + self.ax.press(close_btn) + else: + self.ax.send_cmd_w() + + time.sleep(0.5) + + # 验证 + state = self.detect_state() + return state == UIState.MAIN_CHAT_LIST + + def dismiss_preview(self) -> bool: + """关闭媒体预览。""" + state = self.detect_state() + if state != UIState.MEDIA_PREVIEW: + return True # 没有预览打开 + + # 发送 Escape 关闭预览 + self.ax.send_escape_key() + time.sleep(0.5) + + # 验证 + state = self.detect_state() + if state == UIState.MEDIA_PREVIEW: + # 再试一次 + self.ax.send_escape_key() + time.sleep(0.5) + state = self.detect_state() + + return state != UIState.MEDIA_PREVIEW diff --git a/wechat_clicker/wechat_ui.py b/wechat_clicker/wechat_ui.py new file mode 100644 index 0000000..0ac3f4e --- /dev/null +++ b/wechat_clicker/wechat_ui.py @@ -0,0 +1,394 @@ +"""微信桌面端 UI 导航与元素查找 + +基于 macOS Accessibility API 探测到的微信 v4.1.9 UI 结构: +- 主窗口 "微信" 包含侧边栏和聊天列表 +- 点击聊天项会打开独立的会话窗口(以聊天名命名) +- 聊天列表:AXList name="会话" +- 消息列表:AXList name="消息" +- 每个聊天项的 title 是多行文本,包含聊天名、未读数、最近消息、时间 +""" + +import logging +import re + +from .ax_bridge import AXBridge + +logger = logging.getLogger("wechat_clicker.wechat_ui") + + +class ChatItem: + """解析后的聊天列表项""" + + def __init__(self, element, name: str, unread_count: int, preview: str, timestamp: str): + self.element = element + self.name = name + self.unread_count = unread_count + self.preview = preview + self.timestamp = timestamp + + def __repr__(self): + return f"ChatItem(name={self.name!r}, unread={self.unread_count})" + + +class MessageItem: + """解析后的消息""" + + def __init__(self, element, msg_type: str, title: str, size: tuple): + self.element = element + self.msg_type = msg_type # "image", "file", "video", "text", "timestamp", "other" + self.title = title + self.size = size + + @property + def is_visible(self) -> bool: + """判断消息是否在可见区域(不可见的消息 size 为 0x0)""" + return self.size[0] > 0 and self.size[1] > 0 + + def __repr__(self): + short = self.title.replace("\n", "\\n")[:40] + return f"MessageItem(type={self.msg_type}, title={short!r})" + + +class WeChatUI: + """微信 UI 导航器""" + + def __init__(self, ax: AXBridge, bundle_id: str = "com.tencent.xinWeChat"): + self.ax = ax + self.bundle_id = bundle_id + self._app_ref = None + + # ---------------------------------------------------------------- + # 应用引用管理 + # ---------------------------------------------------------------- + + def get_app_ref(self): + """获取或缓存微信应用的 AXUIElement 引用。""" + if self._app_ref is None: + self._app_ref = self.ax.get_app_ref(self.bundle_id) + return self._app_ref + + def invalidate_app_ref(self): + """清除缓存的应用引用(微信重启后需要)。""" + self._app_ref = None + + def ensure_wechat_frontmost(self) -> bool: + """确保微信在最前台。""" + return self.ax.bring_to_front(self.bundle_id) + + # ---------------------------------------------------------------- + # 窗口查找 + # ---------------------------------------------------------------- + + def get_all_windows(self) -> list: + """获取微信所有窗口。""" + app_ref = self.get_app_ref() + if app_ref is None: + return [] + return self.ax.get_windows(app_ref) + + def get_main_window(self): + """找到主窗口(名为"微信")。""" + for win in self.get_all_windows(): + title = self.ax.get_title(win) + if title == "微信": + return win + logger.warning("未找到微信主窗口") + return None + + def get_conversation_windows(self) -> list: + """获取所有非主窗口(即会话窗口)。""" + result = [] + for win in self.get_all_windows(): + title = self.ax.get_title(win) + # 排除主窗口和一些系统窗口 + if title and title != "微信": + result.append(win) + return result + + def get_conversation_window(self): + """获取第一个会话窗口(如果存在)。""" + conv_windows = self.get_conversation_windows() + return conv_windows[0] if conv_windows else None + + # ---------------------------------------------------------------- + # 元素查找工具 + # ---------------------------------------------------------------- + + def _find_child_by_role_and_name(self, parent, role: str, name: str = None, desc: str = None): + """在子元素中按 role + name/desc 查找。""" + children = self.ax.get_children(parent) + for child in children: + child_role = self.ax.get_role(child) + if child_role != role: + continue + if name is not None: + child_title = self.ax.get_title(child) + if child_title == name: + return child + # 也检查 AXDescription + child_desc = self.ax.get_description(child) + if child_desc == name: + return child + elif desc is not None: + child_desc = self.ax.get_description(child) + if child_desc == desc: + return child + else: + # 只匹配 role + return child + return None + + def _find_child_recursive(self, parent, role: str, name: str = None, max_depth: int = 6): + """递归查找子元素(广度优先)。""" + if max_depth <= 0: + return None + + children = self.ax.get_children(parent) + # 先在当前层查找 + for child in children: + child_role = self.ax.get_role(child) + if child_role == role: + if name is None: + return child + child_title = self.ax.get_title(child) + if child_title == name: + return child + # 也用 AXDescription 匹配 + child_desc = self.ax.get_description(child) + if child_desc == name: + return child + + # 递归到下一层 + for child in children: + result = self._find_child_recursive(child, role, name, max_depth - 1) + if result is not None: + return result + return None + + # ---------------------------------------------------------------- + # 聊天列表 + # ---------------------------------------------------------------- + + def get_chat_list(self): + """找到聊天列表元素(AXList name="会话")。""" + main_win = self.get_main_window() + if main_win is None: + return None + return self._find_child_recursive(main_win, "AXList", "会话") + + def get_sidebar_chat_button(self): + """找到侧边栏的"微信"按钮(显示全局未读数)。""" + main_win = self.get_main_window() + if main_win is None: + return None + return self._find_child_recursive(main_win, "AXButton", "微信") + + def get_global_unread_count(self) -> int: + """从侧边栏"微信"按钮获取全局未读消息数。""" + btn = self.get_sidebar_chat_button() + if btn is None: + return 0 + desc = self.ax.get_description(btn) + if not desc: + return 0 + match = re.search(r"(\d+)条新消息", desc) + if match: + return int(match.group(1)) + return 0 + + def get_chat_items(self) -> list: + """获取聊天列表中所有可见聊天项,返回 ChatItem 列表。""" + chat_list = self.get_chat_list() + if chat_list is None: + logger.warning("未找到聊天列表") + return [] + + items = [] + children = self.ax.get_children(chat_list) + for child in children: + role = self.ax.get_role(child) + if role != "AXStaticText": + continue + + title = self.ax.get_title(child) + if not title: + continue + + parsed = self._parse_chat_title(title) + if parsed["name"]: + items.append(ChatItem( + element=child, + name=parsed["name"], + unread_count=parsed["unread_count"], + preview=parsed["preview"], + timestamp=parsed["timestamp"], + )) + + return items + + def get_unread_chats(self) -> list: + """获取有未读消息的聊天项,按未读数降序排列。""" + items = self.get_chat_items() + unread = [item for item in items if item.unread_count > 0] + unread.sort(key=lambda x: x.unread_count, reverse=True) + return unread + + # ---------------------------------------------------------------- + # 会话消息 + # ---------------------------------------------------------------- + + def get_message_list(self, conv_window): + """在会话窗口中找到消息列表(AXList name="消息")。""" + if conv_window is None: + return None + return self._find_child_recursive(conv_window, "AXList", "消息") + + def get_messages(self, msg_list) -> list: + """获取消息列表中的所有消息,返回 MessageItem 列表。""" + if msg_list is None: + return [] + + items = [] + children = self.ax.get_children(msg_list) + for child in children: + role = self.ax.get_role(child) + if role != "AXStaticText": + continue + + title = self.ax.get_title(child) + if not title: + continue + + size = self.ax.get_size(child) + msg_type = self._classify_message(title, size) + items.append(MessageItem( + element=child, + msg_type=msg_type, + title=title, + size=size, + )) + + return items + + def get_media_messages(self, conv_window) -> list: + """获取会话中所有可见的图片/文件/视频消息。""" + msg_list = self.get_message_list(conv_window) + if msg_list is None: + return [] + + messages = self.get_messages(msg_list) + media = [ + msg for msg in messages + if msg.msg_type in ("image", "file", "video") and msg.is_visible + ] + return media + + # ---------------------------------------------------------------- + # 窗口操作 + # ---------------------------------------------------------------- + + def get_close_button(self, window): + """找到窗口的关闭按钮(AXButton desc="关闭按钮")。""" + if window is None: + return None + + children = self.ax.get_children(window) + for child in children: + role = self.ax.get_role(child) + if role == "AXButton": + desc = self.ax.get_description(child) + if desc == "关闭按钮": + return child + return None + + def click_sidebar_chat_tab(self) -> bool: + """点击侧边栏"微信"按钮,确保显示聊天列表。""" + btn = self.get_sidebar_chat_button() + if btn is None: + logger.warning("未找到侧边栏微信按钮") + return False + return self.ax.press(btn) + + # ---------------------------------------------------------------- + # 解析工具 + # ---------------------------------------------------------------- + + @staticmethod + def _parse_chat_title(title_text: str) -> dict: + """解析聊天项的多行 title 文本。 + + 格式(有未读时): + 聊天名 + [N条] + 最近消息预览 + 时间 + + 格式(无未读时): + 聊天名 + 最近消息预览 + 时间 + """ + lines = title_text.strip().split("\n") + result = { + "name": "", + "unread_count": 0, + "preview": "", + "timestamp": "", + } + + if not lines: + return result + + result["name"] = lines[0].strip() + + if len(lines) >= 2: + # 检查第二行是否是 [N条] 格式 + unread_match = re.match(r"\[(\d+)条\]", lines[1].strip()) + if unread_match: + result["unread_count"] = int(unread_match.group(1)) + if len(lines) >= 3: + result["preview"] = lines[2].strip() + if len(lines) >= 4: + result["timestamp"] = lines[-1].strip() + else: + result["preview"] = lines[1].strip() + if len(lines) >= 3: + result["timestamp"] = lines[-1].strip() + + return result + + @staticmethod + def _classify_message(title: str, size: tuple) -> str: + """根据消息 title 和尺寸分类消息类型。 + + 已知模式(微信 v4.1.9): + - 图片: title="图片" + - 视频: title 以 "视频" 开头 + - 文件: title 以 "文件\\n" 开头 + - 时间戳: 高度约 41px,内容为时间格式 + - 其他: 文本消息或其他类型 + """ + stripped = title.strip() + + # 图片 + if stripped == "图片": + return "image" + + # 视频 + if stripped.startswith("视频"): + return "video" + + # 文件(多行格式:文件\n文件名\n大小\n微信电脑版) + if stripped.startswith("文件\n") or stripped.startswith("[文件]"): + return "file" + + # 时间戳分隔符(高度约 41px,内容为时间格式) + if size[1] > 0 and size[1] <= 45: + time_pattern = re.match( + r"^(\d{1,2}:\d{2}|\d{1,2}/\d{1,2}|星期[一二三四五六日]|昨天|前天)$", + stripped, + ) + if time_pattern: + return "timestamp" + + return "text"