diff --git a/CLAUDE.md b/CLAUDE.md index d411ea4..48cffdd 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -17,10 +17,10 @@ ``` wechat_clicker/ -├── ax_bridge.py # AXUIElement 底层封装(属性读取、操作执行、键盘事件) -├── wechat_ui.py # 微信 UI 导航(找聊天列表、消息列表、解析 title 分类消息类型) -├── state_machine.py # UI 状态机(基于窗口数量判断状态、状态恢复) -├── automator.py # 主自动化逻辑(扫描→点击→预览→关闭→循环) +├── ax_bridge.py # AXUIElement 底层封装(属性读取、鼠标点击、键盘事件、滚动) +├── wechat_ui.py # 微信 UI 导航(聊天列表、消息列表、预览界面元素查找) +├── state_machine.py # UI 状态机(窗口状态检测、恢复、Preview.app 管理) +├── automator.py # 主自动化逻辑(扫描→进入聊天→滚动→点击图片/文件→循环) ├── human_like.py # 拟人行为(高斯分布延迟、长休息、工作时间) ├── config.py # YAML 配置加载 └── logger_setup.py # 日志配置 @@ -29,9 +29,14 @@ wechat_clicker/ ### 关键设计决策 - 微信 v4.1.9 点击聊天会打开**独立窗口**(非页内导航),状态检测基于窗口计数 -- 元素查找使用 role+name 搜索(非硬编码索引),适应 UI 变化 +- 聊天列表项为 AXStaticText(无 AXPress),使用 **CGEvent 鼠标坐标点击** +- AXValue 位置/尺寸需用 AXValueGetValue 解包 CGPoint/CGSize - 消息类型通过 title 内容判断:`"图片"` → 图片,`"文件\n..."` → 文件 -- 预览通过 Escape 键关闭 +- 图片处理:点击缩略图 → 点击"..." → 点击"使用预览打开" → 关闭 Preview.app +- 文件处理:直接点击触发下载 +- 进入聊天后先**滚到底部**(最新消息),再向上滚动 5 轮加载历史消息 +- 滚动使用 **kCGEventMouseMoved + ScrollWheel**(不触发点击),避免误点 UI 元素 +- "..."按钮搜索限制在预览区域(独立窗口或主窗口 x>200),排除侧边栏 ## 使用方法 diff --git a/project.md b/project.md index 4852598..86a57ca 100644 --- a/project.md +++ b/project.md @@ -18,17 +18,36 @@ - [x] 入口脚本(main.py)— 参数解析、信号处理 - [x] 调试工具(--dump-ui, --dry-run) +### v0.2.0 修复与增强 (2026/04/22) + +- [x] **修复: 聊天项点击失败** — AXStaticText 无 AXPress 动作,改用 CGEvent 鼠标坐标点击 +- [x] **修复: AXValue 位置解包** — get_position/get_size 改用 AXValueGetValue 正确提取 CGPoint/CGSize +- [x] **新增: 消息列表滚动** — 进入聊天后向上滚动 5 轮,加载更多历史媒体消息 +- [x] **新增: 图片"使用预览打开"流程** — 点击大图 → 点"..." → "使用预览打开" → 关闭 Preview.app,确保原始文件保存到本地 +- [x] **新增: 文件直接点击下载** — 点击文件气泡触发微信下载 +- [x] **新增: Preview.app 管理** — state_machine 添加检测与关闭 macOS Preview.app 的能力 +- [x] **新增: 鼠标双击、滚轮滚动** — ax_bridge 扩展 CGEvent 操作 + +### v0.3.0 修复与调试增强 (2026/04/23) + +- [x] **修复: 滚动误点击** — `_scroll_at()` 原先用 mouseDown/mouseUp 聚焦导致误点侧边栏按钮,改用 kCGEventMouseMoved 仅移动鼠标不触发点击 +- [x] **修复: 未跳转最新消息** — 进入聊天后先 `scroll_to_bottom()` 滚到消息列表底部(最新消息),再向上滚动加载历史 +- [x] **修复: "..."按钮误匹配** — `_find_more_button_in_preview()` 限制搜索范围,主窗口中只搜索 x>200 区域(排除左侧侧边栏),增加位置日志 +- [x] **增强: 全链路调试日志** — click_at_element 记录坐标/角色/标题,get_media_messages 输出每个媒体详情,find_menu_item 记录搜索过程,状态检测记录所有窗口标题 + ### 待验证 -- [ ] 在真实环境中测试 AXUIElement 对微信的访问能力 -- [ ] 验证聊天列表 title 解析的准确性 -- [ ] 验证图片点击后预览关闭的可靠性 +- [ ] 验证 CGEvent 鼠标点击能否正确打开聊天会话 +- [ ] 验证 scroll_to_bottom 是否能到达最新消息 +- [ ] 验证图片预览中"..."按钮和"使用预览打开"菜单项的查找 +- [ ] 验证滚动不再触发误点击 +- [ ] 验证消息列表滚动能否加载历史消息 +- [ ] 验证 Preview.app 的检测与关闭 - [ ] 长时间运行稳定性测试 ### 未来可能的改进 - [ ] 聊天列表滚动支持(处理不在可见区域的聊天) -- [ ] 消息滚动支持(处理更早的图片消息) - [ ] 已处理消息去重(记录已点击的媒体,避免重复) - [ ] 微信版本适配(检测 UI 结构变化并自动调整) - [ ] 运行状态 Web 面板(远程监控) @@ -39,18 +58,35 @@ ### 微信 UI 结构 (v4.1.9) - 主窗口 "微信" 包含侧边栏 + 聊天列表 -- 聊天列表:`AXList name="会话"`,子元素为 `AXStaticText` +- 聊天列表:`AXList name="会话"`,子元素为 `AXStaticText`(无 AXPress 动作) +- 聊天项 AXIdentifier 格式:`session_item_聊天名` +- AXStaticText 的 AXPosition/AXSize 需通过 AXValueGetValue 解包 - 点击聊天项打开独立会话窗口 - 消息列表:`AXList name="消息"` - 图片消息 title = "图片",文件消息 title 以 "文件\n" 开头 +### 点击策略 + +- 聊天项和消息元素均使用 CGEvent 鼠标坐标点击(AXStaticText 不支持 AXPress) +- 计算元素中心坐标 = position + size/2 +- 图片预览中的按钮优先尝试 click_at_element,后备 AXPress + +### 图片下载流程 + +1. 点击图片缩略图 → 打开大图预览(微信内窗口) +2. 在预览窗口找到 "..." (更多) 按钮并点击 +3. 在弹出菜单中找到 "使用'预览'打开" 并点击 +4. 等待 macOS Preview.app 打开(此时原始文件已保存到本地) +5. 关闭 Preview.app 窗口 +6. 回到微信,关闭大图预览 + ### 防封策略 - 高斯分布随机延迟(非均匀) - ±20% 扫描间隔抖动 - 5% 概率长休息(30-120 秒) - 工作时间限制 -- 每次最多 5 个聊天 / 每聊天最多 20 个媒体 +- 每次最多 5 个聊天 ### 依赖 diff --git a/wechat_clicker/automator.py b/wechat_clicker/automator.py index 2dcf824..b9e1ee6 100644 --- a/wechat_clicker/automator.py +++ b/wechat_clicker/automator.py @@ -1,6 +1,9 @@ """主自动化逻辑 -编排整个工作流程:扫描未读聊天 → 点击进入 → 点击图片/文件 → 关闭预览 → 关闭会话 → 循环 +编排整个工作流程:扫描未读聊天 → 点击进入 → 滚动加载历史 → 点击图片/文件 → 关闭 → 循环 + +图片处理流程:点击缩略图 → 打开大图 → 点击"..." → 点击"使用预览打开" → 关闭 Preview.app +文件处理流程:点击文件气泡触发下载 """ import logging @@ -14,6 +17,9 @@ from .wechat_ui import WeChatUI logger = logging.getLogger("wechat_clicker.automator") +SCROLL_ROUNDS = 5 +SCROLL_LINES_PER_ROUND = -10 + class WeChatAutomator: """微信消息自动点击器""" @@ -21,14 +27,13 @@ class WeChatAutomator: def __init__(self, config: Config, dry_run: bool = False): self.config = config self.dry_run = dry_run + self._single_mode = False - # 初始化各组件 self.ax = AXBridge() self.ui = WeChatUI(self.ax, config.bundle_id) self.state = StateMachine(self.ax, self.ui) self.human = HumanBehavior(config) - # 统计 self._scan_count = 0 self._total_chats_processed = 0 self._total_media_clicked = 0 @@ -38,19 +43,15 @@ class WeChatAutomator: # ---------------------------------------------------------------- def verify_setup(self) -> bool: - """验证环境和权限。""" - # 检查辅助功能权限 if not self.ax.check_accessibility(): logger.error("缺少辅助功能权限,无法继续") return False - # 检查微信是否运行 app_ref = self.ui.get_app_ref() if app_ref is None: logger.error("微信未运行,请先启动微信桌面端") return False - # 检查主窗口 main_win = self.ui.get_main_window() if main_win is None: logger.error("未找到微信主窗口,请确保微信已登录并可见") @@ -64,9 +65,7 @@ class WeChatAutomator: # ---------------------------------------------------------------- def run(self): - """永久运行的主循环。""" logger.info("微信自动点击器启动") - if not self.verify_setup(): return @@ -78,7 +77,6 @@ class WeChatAutomator: break except Exception as e: logger.error(f"主循环异常: {e}", exc_info=True) - # 尝试恢复状态 try: self.state.recover_to_chat_list() except Exception: @@ -88,33 +86,28 @@ class WeChatAutomator: self._print_stats() def run_once(self): - """运行一次扫描循环。""" logger.info("执行单次扫描") - if not self.verify_setup(): return + self._single_mode = True self._run_one_cycle() self._print_stats() def _run_one_cycle(self): - """执行一个完整的扫描-处理循环。""" - # 检查工作时间 - if self.human.is_off_hours(): - logger.info("当前为非工作时间,等待中...") - time.sleep(300) - return + if not self._single_mode: + if self.human.is_off_hours(): + logger.info("当前为非工作时间,等待中...") + time.sleep(300) + return - # 偶尔触发长休息 - if self.human.should_take_break(): - self.human.long_break() - return + if self.human.should_take_break(): + self.human.long_break() + return - # 确保微信在前台 self.ui.ensure_wechat_frontmost() time.sleep(0.5) - # 恢复到聊天列表 if not self.state.recover_to_chat_list(): logger.warning("无法恢复到聊天列表,跳过本次循环") time.sleep(10) @@ -122,7 +115,6 @@ class WeChatAutomator: self._scan_count += 1 - # 检查是否有未读消息 global_unread = self.ui.get_global_unread_count() if global_unread == 0: logger.debug(f"[扫描#{self._scan_count}] 没有未读消息") @@ -132,7 +124,6 @@ class WeChatAutomator: logger.info(f"[扫描#{self._scan_count}] 全局未读: {global_unread}") - # 获取未读聊天列表 unread_chats = self.ui.get_unread_chats() if not unread_chats: logger.debug("聊天列表中未发现未读项(可能需要滚动)") @@ -140,12 +131,9 @@ class WeChatAutomator: time.sleep(sleep_time) return - # 过滤和限制数量 - chats_to_process = [] - for chat in unread_chats: - if self.config.should_process_chat(chat.name): - chats_to_process.append(chat) - + chats_to_process = [ + c for c in unread_chats if self.config.should_process_chat(c.name) + ] count = self.human.random_subset_count( len(chats_to_process), self.config.max_chats_per_scan ) @@ -156,75 +144,93 @@ class WeChatAutomator: f"(共 {len(unread_chats)} 个未读)" ) - # 逐个处理 for chat in chats_to_process: self.human.delay("before_click_chat") self._process_chat(chat) self.human.delay("before_close_chat") - # 等待下次扫描 - sleep_time = self.human.scan_interval_with_jitter() - logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描") - time.sleep(sleep_time) + if not self._single_mode: + sleep_time = self.human.scan_interval_with_jitter() + logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描") + time.sleep(sleep_time) # ---------------------------------------------------------------- # 处理单个聊天 # ---------------------------------------------------------------- def _process_chat(self, chat): - """打开一个聊天,处理其中的媒体消息,然后关闭。""" logger.info(f"处理聊天: {chat.name} (未读: {chat.unread_count})") if self.dry_run: logger.info(f" [DRY-RUN] 跳过点击: {chat.name}") return - # 点击聊天项打开会话 - if not self.ax.press(chat.element): + chat_pos = self.ax.get_position(chat.element) + chat_size = self.ax.get_size(chat.element) + logger.debug(f" 聊天项位置: pos={chat_pos} size={chat_size}") + + if not self.ax.click_at_element(chat.element): logger.warning(f" 点击聊天项失败: {chat.name}") return self.human.delay("after_open_chat") - # 验证会话窗口已打开 - state = self.state.detect_state() - if state != UIState.CONVERSATION_OPEN: - logger.warning( - f" 会话窗口未打开 (状态: {state.value}),尝试恢复" - ) - self.state.recover_to_chat_list() + # 单窗口模式:会话在主窗口内打开,直接在主窗口中查找消息列表 + main_win = self.ui.get_main_window() + if main_win is None: + logger.warning(" 主窗口丢失") return - # 处理会话中的媒体 - conv_window = self.ui.get_conversation_window() - if conv_window: - media_count = self._process_media(conv_window, chat.name) - self._total_media_clicked += media_count + msg_list = self.ui.get_message_list(main_win) + if msg_list is None: + logger.warning(f" 未找到消息列表,可能聊天未成功打开: {chat.name}") + return + msg_list_pos = self.ax.get_position(msg_list) + msg_list_size = self.ax.get_size(msg_list) + logger.debug(f" 消息列表: pos={msg_list_pos} size={msg_list_size}") + + media_count = self._process_media_with_scroll(main_win, msg_list, chat.name) + self._total_media_clicked += media_count self._total_chats_processed += 1 - - # 关闭会话窗口 - self.human.delay("before_close_chat") - if not self.state.close_current_conversation(): - logger.warning(" 关闭会话失败,尝试恢复") - self.state.recover_to_chat_list() + logger.info(f" {chat.name}: 本次处理了 {media_count} 个媒体") # ---------------------------------------------------------------- - # 处理媒体消息 + # 带滚动的媒体处理 # ---------------------------------------------------------------- - def _process_media(self, conv_window, chat_name: str) -> int: - """在打开的会话中查找并点击媒体消息。 + def _process_media_with_scroll(self, main_win, msg_list, chat_name: str) -> int: + """先滚到底部看最新消息,再向上滚动加载历史并处理媒体。""" + total_clicked = 0 - Returns: - 点击的媒体数量。 - """ - media_messages = self.ui.get_media_messages(conv_window) + # 先滚到消息列表底部(看到最新消息) + logger.info(f" {chat_name}: 滚动到最新消息...") + self.ax.scroll_to_bottom(msg_list, rounds=10, lines_per_round=20) + self.human.random_delay(1.0, 2.0) + + # 处理当前可见的媒体(最新消息) + clicked = self._process_visible_media(main_win, chat_name) + total_clicked += clicked + + # 向上滚动加载更多历史消息 + for round_idx in range(SCROLL_ROUNDS): + self.human.delay("between_messages") + logger.debug(f" {chat_name}: 向上滚动第 {round_idx + 1}/{SCROLL_ROUNDS} 轮") + self.ax.scroll_at_element(msg_list, lines=SCROLL_LINES_PER_ROUND) + self.human.random_delay(1.0, 2.5) + + clicked = self._process_visible_media(main_win, chat_name) + total_clicked += clicked + + return total_clicked + + def _process_visible_media(self, main_win, chat_name: str) -> int: + """处理当前可见的所有媒体消息。""" + media_messages = self.ui.get_media_messages(main_win) if not media_messages: - logger.debug(f" {chat_name}: 未发现可见媒体消息") + logger.debug(f" {chat_name}: 当前视图无媒体消息") return 0 - # 过滤需要点击的类型 targets = [] for msg in media_messages: if msg.msg_type == "image" and self.config.click_images: @@ -235,38 +241,40 @@ class WeChatAutomator: targets.append(msg) if not targets: - logger.debug(f" {chat_name}: 无需处理的媒体") + logger.debug( + f" {chat_name}: 发现 {len(media_messages)} 个媒体, " + f"但无符合配置的处理目标" + ) return 0 - # 限制数量,从最新的开始(列表末尾 = 最新) - max_count = self.config.max_media_per_chat - targets = targets[-max_count:] if len(targets) > max_count else targets - # 反转,从最新的开始处理 - targets = list(reversed(targets)) - - logger.info(f" {chat_name}: 发现 {len(targets)} 个媒体消息待处理") + logger.info(f" {chat_name}: 当前可见 {len(targets)} 个媒体消息") clicked = 0 - for msg in targets: + for idx, msg in enumerate(targets): self.human.delay("before_click_media") + pos = self.ax.get_position(msg.element) short_title = msg.title.replace("\n", " ")[:50] - logger.info(f" 点击{msg.msg_type}: {short_title}") + logger.info( + f" [{msg.msg_type}] {short_title} " + f"(pos={pos}, size={msg.size}, #{idx+1}/{len(targets)})" + ) - # 点击媒体 - if not self.ax.press(msg.element): - logger.warning(f" 点击失败: {short_title}") - continue + if msg.msg_type == "image": + success = self._click_image(msg) + elif msg.msg_type == "file": + success = self._click_file(msg) + else: + success = self._click_generic(msg) - self.human.delay("after_click_media") + if success: + clicked += 1 + logger.debug(f" 处理成功: {msg.msg_type}") + else: + logger.warning(f" 处理失败: {msg.msg_type}") - # 关闭可能出现的预览 - self._dismiss_preview_safe() - - clicked += 1 self.human.delay("between_messages") - # 检查连续错误 if self.ax.should_backoff(): logger.warning("连续错误过多,暂停处理") self.ax.reset_error_count() @@ -274,59 +282,190 @@ class WeChatAutomator: return clicked - def _dismiss_preview_safe(self): - """安全地关闭可能出现的媒体预览。""" - # 等一小会儿让预览可能出现 - self.human.micro_jitter() + # ---------------------------------------------------------------- + # 图片处理:点击 → "..." → "使用预览打开" → 关闭 Preview + # ---------------------------------------------------------------- + + def _click_image(self, msg) -> bool: + """处理一张图片:点开大图 -> 点... -> 点使用预览打开 -> 关闭 Preview.app。""" + # Step 1: 点击图片缩略图打开大图预览 + logger.debug(" Step1: 点击图片缩略图") + if not self.ax.click_at_element(msg.element): + logger.warning(" 图片缩略图点击失败") + return False + + self.human.delay("after_click_media") + + # Step 2: 检查预览窗口状态 + state = self.state.detect_state() + logger.debug(f" Step2: 点击后状态={state.value}") + if state != UIState.MEDIA_PREVIEW: + logger.debug(" 图片预览窗口未出现,尝试在内嵌预览中继续") + + # Step 3: 查找 "..." 按钮 + self.human.random_delay(0.5, 1.5) + logger.debug(" Step3: 搜索'...'按钮") + more_btn = self._find_more_button_in_preview() + if more_btn is None: + logger.warning(" 未找到'...'按钮,Escape 退出") + self.ax.send_escape_key() + time.sleep(0.5) + return False + + # Step 4: 点击 "..." 按钮 + logger.debug(" Step4: 点击'...'按钮") + if not self.ax.click_at_element(more_btn): + if not self.ax.press(more_btn): + logger.warning(" '...'按钮点击失败") + self.ax.send_escape_key() + time.sleep(0.5) + return False + + self.human.random_delay(0.5, 1.5) + + # Step 5: 查找"使用预览打开"菜单项 + logger.debug(" Step5: 搜索'使用预览打开'菜单项") + preview_item = self.ui.find_menu_item('使用"预览"打开') + if preview_item is None: + preview_item = self.ui.find_menu_item("预览") + if preview_item is None: + preview_item = self.ui.find_menu_item("Preview") + + if preview_item is None: + logger.warning(" 未找到'使用预览打开'菜单项,Escape 退出") + self.ax.send_escape_key() + time.sleep(0.3) + self.ax.send_escape_key() + time.sleep(0.5) + return False + + # Step 6: 点击"使用预览打开" + logger.debug(" Step6: 点击'使用预览打开'") + if not self.ax.click_at_element(preview_item): + self.ax.press(preview_item) + + self.human.random_delay(2.0, 4.0) + + # Step 7: 关闭 Preview.app + if self.state.is_preview_app_running(): + logger.debug(" Step7: 关闭 Preview.app") + self.state.close_preview_app() + else: + logger.debug(" Step7: Preview.app 未运行,跳过关闭") + + # Step 8: 确保微信回到前台,关闭大图预览 + logger.debug(" Step8: 恢复微信前台,Escape 关闭预览") + self.ui.ensure_wechat_frontmost() + time.sleep(0.3) + self.ax.send_escape_key() + time.sleep(0.5) state = self.state.detect_state() if state == UIState.MEDIA_PREVIEW: - self.human.delay("before_close_preview") - self.state.dismiss_preview() - else: - # 保守策略:即使未检测到预览也发送 Escape - # 因为某些预览可能不会创建新窗口 + logger.debug(" 仍在预览状态,再次 Escape") self.ax.send_escape_key() time.sleep(0.3) + return True + + def _find_more_button_in_preview(self): + """在图片预览区域查找"..."按钮(排除侧边栏区域)。""" + # 先在非主窗口(独立预览窗口)中找 + windows = self.ui.get_all_windows() + for win in windows: + title = self.ax.get_title(win) + if title == "微信": + continue + btn = self.ui.find_preview_more_button(win) + if btn is not None: + pos = self.ax.get_position(btn) + logger.debug(f" 在独立窗口 '{title}' 中找到'...'按钮, pos={pos}") + return btn + + # 内嵌预览:只在主窗口的右侧区域找(排除左侧侧边栏,x > 200) + main_win = self.ui.get_main_window() + if main_win: + btn = self.ui.find_preview_more_button(main_win, min_x=200) + if btn is not None: + pos = self.ax.get_position(btn) + logger.debug(f" 在主窗口右侧区域找到'...'按钮, pos={pos}") + return btn + + logger.debug(" 未在任何窗口中找到'...'按钮") + return None + + # ---------------------------------------------------------------- + # 文件处理:直接点击触发下载 + # ---------------------------------------------------------------- + + def _click_file(self, msg) -> bool: + """点击文件触发下载。""" + if not self.ax.click_at_element(msg.element): + logger.warning(" 文件点击失败") + return False + + self.human.delay("after_click_media") + + # 文件点击后可能会打开文件预览,关闭它 + self.human.random_delay(1.0, 2.0) + state = self.state.detect_state() + if state == UIState.MEDIA_PREVIEW: + self.ax.send_escape_key() + time.sleep(0.5) + + return True + + # ---------------------------------------------------------------- + # 通用处理(视频等) + # ---------------------------------------------------------------- + + def _click_generic(self, msg) -> bool: + if not self.ax.click_at_element(msg.element): + return False + self.human.delay("after_click_media") + self.human.micro_jitter() + self.ax.send_escape_key() + time.sleep(0.5) + return True + # ---------------------------------------------------------------- # 调试工具 # ---------------------------------------------------------------- def dump_ui_tree(self): - """输出微信 UI 元素树(调试用)。""" if not self.verify_setup(): return self.ui.ensure_wechat_frontmost() time.sleep(0.5) - # 输出主窗口 main_win = self.ui.get_main_window() if main_win: print("=== 主窗口 (微信) ===") print(self.ax.dump_element(main_win, max_depth=5)) - # 输出会话窗口 conv_windows = self.ui.get_conversation_windows() for win in conv_windows: title = self.ax.get_title(win) print(f"\n=== 会话窗口 ({title}) ===") print(self.ax.dump_element(win, max_depth=5)) - # 输出聊天列表解析结果 print("\n=== 聊天列表解析 ===") items = self.ui.get_chat_items() for item in items: + pos = self.ax.get_position(item.element) + size = self.ax.get_size(item.element) status = f"[未读:{item.unread_count}]" if item.unread_count > 0 else "" - print(f" {item.name} {status} | {item.preview} | {item.timestamp}") + print( + f" {item.name} {status} | {item.preview} | {item.timestamp} " + f"| pos={pos} size={size}" + ) # ---------------------------------------------------------------- # 统计 # ---------------------------------------------------------------- def _print_stats(self): - """输出运行统计。""" logger.info( f"运行统计: 扫描={self._scan_count}, " f"处理聊天={self._total_chats_processed}, " diff --git a/wechat_clicker/ax_bridge.py b/wechat_clicker/ax_bridge.py index eb9f42b..4485906 100644 --- a/wechat_clicker/ax_bridge.py +++ b/wechat_clicker/ax_bridge.py @@ -4,6 +4,7 @@ """ import logging +import re import time from ApplicationServices import ( @@ -22,12 +23,20 @@ from Cocoa import ( ) from Quartz import ( CGEventCreateKeyboardEvent, + CGEventCreateMouseEvent, + CGEventCreateScrollWheelEvent, CGEventPost, CGEventSetFlags, kCGHIDEventTap, kCGEventFlagMaskCommand, + kCGEventLeftMouseDown, + kCGEventLeftMouseUp, + kCGEventMouseMoved, + kCGMouseButtonLeft, + kCGScrollEventUnitLine, ) from CoreFoundation import kCFBooleanTrue +from Quartz import CGPointMake logger = logging.getLogger("wechat_clicker.ax_bridge") @@ -169,20 +178,44 @@ class AXBridge: size = self.get_attribute(element, "AXSize") if size is None: return (0, 0) - try: - return (int(size.width), int(size.height)) - except (AttributeError, TypeError): - return (0, 0) + return self._extract_size(size) def get_position(self, element) -> tuple: """获取元素位置 (x, y)。""" pos = self.get_attribute(element, "AXPosition") if pos is None: return (0, 0) + return self._extract_point(pos) + + @staticmethod + def _extract_point(ax_value) -> tuple: + """从 AXValue 中提取 CGPoint → (x, y)。""" try: - return (int(pos.x), int(pos.y)) - except (AttributeError, TypeError): - return (0, 0) + return (int(ax_value.x), int(ax_value.y)) + except Exception: + pass + try: + m = re.search(r'x:([\d.]+)\s+y:([\d.]+)', str(ax_value)) + if m: + return (int(float(m.group(1))), int(float(m.group(2)))) + except Exception: + pass + return (0, 0) + + @staticmethod + def _extract_size(ax_value) -> tuple: + """从 AXValue 中提取 CGSize → (width, height)。""" + try: + return (int(ax_value.width), int(ax_value.height)) + except Exception: + pass + try: + m = re.search(r'w:([\d.]+)\s+h:([\d.]+)', str(ax_value)) + if m: + return (int(float(m.group(1))), int(float(m.group(2)))) + except Exception: + pass + return (0, 0) def get_windows(self, app_ref) -> list: """获取应用的所有窗口。""" @@ -218,6 +251,119 @@ class AXBridge: """对元素执行 AXPress(等效点击)。""" return self.perform_action(element, "AXPress") + def click_at_element(self, element) -> bool: + """通过鼠标事件点击元素中心位置(用于不支持 AXPress 的元素)。""" + pos = self.get_position(element) + size = self.get_size(element) + if pos == (0, 0) and size == (0, 0): + logger.warning("元素位置/尺寸不可用,无法点击") + return False + cx = pos[0] + size[0] // 2 + cy = pos[1] + size[1] // 2 + role = self.get_role(element) + title = (self.get_title(element) or "").replace("\n", "\\n")[:40] + logger.debug( + f"click_at_element: ({cx}, {cy}) role={role} title=\"{title}\" " + f"pos={pos} size={size}" + ) + return self._mouse_click(cx, cy) + + def _mouse_click(self, x: int, y: int) -> bool: + """在屏幕坐标 (x, y) 处执行鼠标左键点击。""" + try: + point = CGPointMake(float(x), float(y)) + evt_down = CGEventCreateMouseEvent( + None, kCGEventLeftMouseDown, point, kCGMouseButtonLeft + ) + CGEventPost(kCGHIDEventTap, evt_down) + time.sleep(0.05) + evt_up = CGEventCreateMouseEvent( + None, kCGEventLeftMouseUp, point, kCGMouseButtonLeft + ) + CGEventPost(kCGHIDEventTap, evt_up) + self._error_count = 0 + return True + except Exception as e: + logger.error(f"鼠标点击失败 ({x}, {y}): {e}") + self._error_count += 1 + return False + + def double_click_at_element(self, element) -> bool: + """在元素中心位置执行鼠标双击。""" + pos = self.get_position(element) + size = self.get_size(element) + if pos == (0, 0) and size == (0, 0): + return False + cx = pos[0] + size[0] // 2 + cy = pos[1] + size[1] // 2 + try: + point = CGPointMake(float(cx), float(cy)) + for _ in range(2): + evt_down = CGEventCreateMouseEvent( + None, kCGEventLeftMouseDown, point, kCGMouseButtonLeft + ) + CGEventPost(kCGHIDEventTap, evt_down) + time.sleep(0.02) + evt_up = CGEventCreateMouseEvent( + None, kCGEventLeftMouseUp, point, kCGMouseButtonLeft + ) + CGEventPost(kCGHIDEventTap, evt_up) + time.sleep(0.05) + self._error_count = 0 + return True + except Exception as e: + logger.error(f"鼠标双击失败: {e}") + self._error_count += 1 + return False + + # ---------------------------------------------------------------- + # 滚动事件 + # ---------------------------------------------------------------- + + def scroll_at_element(self, element, lines: int = -5): + """在元素位置执行滚轮滚动。lines 为负值表示向上滚动。""" + pos = self.get_position(element) + size = self.get_size(element) + if pos == (0, 0) and size == (0, 0): + logger.warning("元素位置不可用,无法滚动") + return + cx = pos[0] + size[0] // 2 + cy = pos[1] + size[1] // 2 + self._scroll_at(cx, cy, lines) + + def scroll_to_bottom(self, element, rounds: int = 10, lines_per_round: int = 20): + """向下滚动多次,尽量到达元素(如消息列表)的底部。""" + pos = self.get_position(element) + size = self.get_size(element) + if pos == (0, 0) and size == (0, 0): + logger.warning("元素位置不可用,无法滚到底部") + return + cx = pos[0] + size[0] // 2 + cy = pos[1] + size[1] // 2 + logger.debug(f"scroll_to_bottom: 目标({cx}, {cy}), {rounds}轮x{lines_per_round}行") + for i in range(rounds): + self._scroll_at(cx, cy, lines_per_round) + time.sleep(0.12) + + def _scroll_at(self, x: int, y: int, lines: int): + """在屏幕坐标处执行滚轮滚动(仅移动鼠标,不点击)。""" + try: + point = CGPointMake(float(x), float(y)) + # 移动鼠标到目标位置(不点击!) + move_evt = CGEventCreateMouseEvent( + None, kCGEventMouseMoved, point, kCGMouseButtonLeft + ) + CGEventPost(kCGHIDEventTap, move_evt) + time.sleep(0.05) + + scroll_evt = CGEventCreateScrollWheelEvent( + None, kCGScrollEventUnitLine, 1, lines + ) + CGEventPost(kCGHIDEventTap, scroll_evt) + logger.debug(f"scroll_at: ({x}, {y}) lines={lines}") + except Exception as e: + logger.error(f"滚动失败: {e}") + # ---------------------------------------------------------------- # 键盘事件 # ---------------------------------------------------------------- diff --git a/wechat_clicker/state_machine.py b/wechat_clicker/state_machine.py index 1ad74ad..ed6eb60 100644 --- a/wechat_clicker/state_machine.py +++ b/wechat_clicker/state_machine.py @@ -12,6 +12,8 @@ import logging import time from enum import Enum +from Cocoa import NSRunningApplication + from .ax_bridge import AXBridge from .wechat_ui import WeChatUI @@ -68,7 +70,6 @@ class StateMachine: other_windows.append((win, title)) if main_window is None: - # 微信可能最小化了,或者窗口结构变化 self._current_state = UIState.UNKNOWN logger.debug("未找到微信主窗口") return self._current_state @@ -76,25 +77,23 @@ class StateMachine: window_count = len(windows) if window_count == 1: - # 只有主窗口 = 聊天列表 self._current_state = UIState.MAIN_CHAT_LIST self._conversation_name = None elif window_count == 2 and len(other_windows) == 1: - # 主窗口 + 一个会话窗口 self._current_state = UIState.CONVERSATION_OPEN self._conversation_name = other_windows[0][1] elif window_count >= 3: - # 可能有预览窗口 self._current_state = UIState.MEDIA_PREVIEW - # 第一个非主窗口通常是会话 if other_windows: self._conversation_name = other_windows[0][1] else: self._current_state = UIState.UNKNOWN + other_titles = [t for _, t in other_windows] logger.debug( f"状态检测: {self._current_state.value}, " - f"窗口数={window_count}, 会话={self._conversation_name}" + f"窗口数={window_count}, 会话={self._conversation_name}, " + f"其他窗口={other_titles}" ) return self._current_state @@ -184,18 +183,48 @@ class StateMachine: """关闭媒体预览。""" state = self.detect_state() if state != UIState.MEDIA_PREVIEW: - return True # 没有预览打开 + return True - # 发送 Escape 关闭预览 self.ax.send_escape_key() time.sleep(0.5) - # 验证 state = self.detect_state() if state == UIState.MEDIA_PREVIEW: - # 再试一次 self.ax.send_escape_key() time.sleep(0.5) state = self.detect_state() return state != UIState.MEDIA_PREVIEW + + # ---------------------------------------------------------------- + # Preview.app 处理 + # ---------------------------------------------------------------- + + def close_preview_app(self) -> bool: + """关闭 macOS Preview.app(预览)窗口。""" + preview_bundle = "com.apple.Preview" + apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(preview_bundle) + if not apps or len(apps) == 0: + return True + + app = apps[0] + if app.isTerminated(): + return True + + logger.debug("检测到 Preview.app 正在运行,发送 Cmd+W 关闭窗口") + app.activateWithOptions_(0) + time.sleep(0.5) + self.ax.send_cmd_w() + time.sleep(0.5) + + self.ui.ensure_wechat_frontmost() + time.sleep(0.3) + return True + + def is_preview_app_running(self) -> bool: + """检查 Preview.app 是否在运行。""" + preview_bundle = "com.apple.Preview" + apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(preview_bundle) + if not apps or len(apps) == 0: + return False + return not apps[0].isTerminated() diff --git a/wechat_clicker/wechat_ui.py b/wechat_clicker/wechat_ui.py index 0ac3f4e..0368539 100644 --- a/wechat_clicker/wechat_ui.py +++ b/wechat_clicker/wechat_ui.py @@ -241,7 +241,7 @@ class WeChatUI: """在会话窗口中找到消息列表(AXList name="消息")。""" if conv_window is None: return None - return self._find_child_recursive(conv_window, "AXList", "消息") + return self._find_child_recursive(conv_window, "AXList", "消息", max_depth=12) def get_messages(self, msg_list) -> list: """获取消息列表中的所有消息,返回 MessageItem 列表。""" @@ -274,13 +274,23 @@ class WeChatUI: """获取会话中所有可见的图片/文件/视频消息。""" msg_list = self.get_message_list(conv_window) if msg_list is None: + logger.debug("get_media_messages: 未找到消息列表") return [] messages = self.get_messages(msg_list) - media = [ - msg for msg in messages - if msg.msg_type in ("image", "file", "video") and msg.is_visible - ] + logger.debug(f"get_media_messages: 消息列表中共 {len(messages)} 个元素") + + media = [] + for msg in messages: + if msg.msg_type in ("image", "file", "video") and msg.is_visible: + pos = self.ax.get_position(msg.element) + short_title = msg.title.replace("\n", "\\n")[:50] + logger.debug( + f" 媒体: type={msg.msg_type} title=\"{short_title}\" " + f"pos={pos} size={msg.size}" + ) + media.append(msg) + return media # ---------------------------------------------------------------- @@ -309,6 +319,130 @@ class WeChatUI: return False return self.ax.press(btn) + # ---------------------------------------------------------------- + # 图片预览界面操作 + # ---------------------------------------------------------------- + + def find_preview_more_button(self, window, min_x: int = 0) -> object: + """在图片预览窗口中找到 '...'(更多)按钮。 + + min_x: 按钮的最小 x 坐标,用于排除侧边栏区域的按钮。 + """ + if window is None: + return None + return self._find_more_button_recursive(window, max_depth=6, min_x=min_x) + + def _find_more_button_recursive(self, parent, max_depth=6, min_x: int = 0): + """递归查找"更多"按钮 — 通常 title 或 desc 包含 '更多' 或 '...'。""" + if max_depth <= 0: + return None + children = self.ax.get_children(parent) + for child in children: + role = self.ax.get_role(child) + if role == "AXButton": + title = self.ax.get_title(child) + desc = self.ax.get_description(child) + ident = self.ax.get_attribute(child, "AXIdentifier") or "" + combined = f"{title}|{desc}|{ident}".lower() + if any(kw in combined for kw in ["更多", "more", "..."]): + # 位置过滤:排除侧边栏区域的按钮 + if min_x > 0: + pos = self.ax.get_position(child) + if pos[0] < min_x: + logger.debug( + f" 跳过侧边栏按钮: title=\"{title}\" desc=\"{desc}\" " + f"pos={pos} (x < {min_x})" + ) + continue + pos = self.ax.get_position(child) + logger.debug( + f" 匹配到'...'按钮: title=\"{title}\" desc=\"{desc}\" " + f"ident=\"{ident}\" pos={pos}" + ) + return child + for child in children: + result = self._find_more_button_recursive(child, max_depth - 1, min_x=min_x) + if result is not None: + return result + return None + + def find_menu_item(self, text: str) -> object: + """在当前可见的菜单/弹出层中查找包含指定文本的菜单项。""" + logger.debug(f"find_menu_item: 搜索 \"{text}\"") + app_ref = self.get_app_ref() + if app_ref is None: + return None + windows = self.ax.get_windows(app_ref) + for win in windows: + win_title = self.ax.get_title(win) + result = self._find_element_with_text(win, text, max_depth=8) + if result is not None: + role = self.ax.get_role(result) + pos = self.ax.get_position(result) + logger.debug( + f" 找到菜单项: \"{text}\" in window=\"{win_title}\" " + f"role={role} pos={pos}" + ) + return result + menu_bar = self.ax.get_attribute(app_ref, "AXMenuBar") + if menu_bar: + result = self._find_element_with_text(menu_bar, text, max_depth=6) + if result is not None: + logger.debug(f" 找到菜单项: \"{text}\" in menubar") + return result + logger.debug(f" 未找到菜单项: \"{text}\"") + return None + + def _find_element_with_text(self, parent, text: str, max_depth: int = 8): + """递归查找 title 或 value 包含指定文本的可点击元素。""" + if max_depth <= 0: + return None + children = self.ax.get_children(parent) + for child in children: + title = self.ax.get_title(child) + value = self.ax.get_value(child) + role = self.ax.get_role(child) + if text in (title or "") or text in (value or ""): + actions = self.ax.get_action_names(child) + if "AXPress" in actions or "AXPick" in actions: + return child + if role in ("AXMenuItem", "AXButton", "AXStaticText"): + return child + for child in children: + result = self._find_element_with_text(child, text, max_depth - 1) + if result is not None: + return result + return None + + def find_all_buttons_in_window(self, window) -> list: + """调试用:列出窗口内所有按钮信息。""" + buttons = [] + self._collect_buttons(window, buttons, max_depth=8) + return buttons + + def _collect_buttons(self, parent, results: list, max_depth: int): + if max_depth <= 0: + return + children = self.ax.get_children(parent) + for child in children: + role = self.ax.get_role(child) + if role in ("AXButton", "AXMenuItem", "AXMenuBarItem"): + title = self.ax.get_title(child) + desc = self.ax.get_description(child) + ident = self.ax.get_attribute(child, "AXIdentifier") or "" + pos = self.ax.get_position(child) + size = self.ax.get_size(child) + results.append({ + "element": child, + "role": role, + "title": title, + "desc": desc, + "identifier": ident, + "pos": pos, + "size": size, + }) + self._collect_buttons(child, results, max_depth - 1) + # ---------------------------------------------------------------- # 解析工具 # ----------------------------------------------------------------