"""macOS Accessibility API (AXUIElement) 底层封装 通过 pyobjc 调用 ApplicationServices 框架,提供对 UI 元素的读取和操作能力。 """ import logging import re import time from ApplicationServices import ( AXIsProcessTrusted, AXIsProcessTrustedWithOptions, AXUIElementCreateApplication, AXUIElementCopyAttributeValue, AXUIElementCopyAttributeNames, AXUIElementCopyActionNames, AXUIElementPerformAction, ) from Cocoa import ( NSRunningApplication, NSWorkspace, NSApplicationActivateIgnoringOtherApps, ) from Quartz import ( CGEventCreateKeyboardEvent, CGEventCreateMouseEvent, CGEventCreateScrollWheelEvent, CGEventPost, CGEventSetFlags, kCGHIDEventTap, kCGEventFlagMaskCommand, kCGEventLeftMouseDown, kCGEventLeftMouseUp, kCGEventMouseMoved, kCGMouseButtonLeft, kCGScrollEventUnitLine, ) from CoreFoundation import kCFBooleanTrue from Quartz import CGPointMake logger = logging.getLogger("wechat_clicker.ax_bridge") # AXError 常量 kAXErrorSuccess = 0 kAXErrorAttributeUnsupported = -25205 kAXErrorNoValue = -25212 kAXErrorInvalidUIElement = -25202 # 键码 kVK_Escape = 0x35 kVK_W = 0x0D class AXBridge: """AXUIElement 底层封装""" def __init__(self): self._error_count = 0 self._max_errors_before_backoff = 10 self._backoff_seconds = 60 # ---------------------------------------------------------------- # 辅助功能权限检查 # ---------------------------------------------------------------- def check_accessibility(self) -> bool: """检查当前进程是否拥有辅助功能权限。""" trusted = AXIsProcessTrusted() if not trusted: logger.error( "辅助功能权限未授予!请前往:系统设置 > 隐私与安全 > 辅助功能," "添加当前终端应用或 Python 解释器。" ) # 弹出系统提示框 AXIsProcessTrustedWithOptions( { "AXTrustedCheckOptionPrompt": kCFBooleanTrue } ) return trusted # ---------------------------------------------------------------- # 获取应用引用 # ---------------------------------------------------------------- def get_app_ref(self, bundle_id: str): """根据 bundle ID 获取应用的 AXUIElement 引用。返回 None 表示应用未运行。""" apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id) if not apps or len(apps) == 0: logger.error(f"未找到运行中的应用: {bundle_id}") return None app = apps[0] pid = app.processIdentifier() logger.debug(f"找到应用 {bundle_id}, PID={pid}") return AXUIElementCreateApplication(pid) def bring_to_front(self, bundle_id: str) -> bool: """将指定应用带到最前台。""" apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id) if not apps or len(apps) == 0: return False app = apps[0] return app.activateWithOptions_(NSApplicationActivateIgnoringOtherApps) # ---------------------------------------------------------------- # 属性读取 # ---------------------------------------------------------------- def get_attribute(self, element, attr_name: str): """读取 UI 元素的指定属性。失败返回 None。""" try: err, value = AXUIElementCopyAttributeValue(element, attr_name, None) if err == kAXErrorSuccess: self._error_count = 0 return value if err not in (kAXErrorNoValue, kAXErrorAttributeUnsupported): logger.debug(f"读取属性 {attr_name} 失败, 错误码: {err}") return None except Exception as e: logger.debug(f"读取属性 {attr_name} 异常: {e}") return None def get_attribute_names(self, element) -> list: """获取元素支持的所有属性名。""" try: err, names = AXUIElementCopyAttributeNames(element, None) if err == kAXErrorSuccess: return list(names) if names else [] return [] except Exception: return [] def get_action_names(self, element) -> list: """获取元素支持的所有操作名。""" try: err, names = AXUIElementCopyActionNames(element, None) if err == kAXErrorSuccess: return list(names) if names else [] return [] except Exception: return [] def get_children(self, element) -> list: """获取子元素列表。""" children = self.get_attribute(element, "AXChildren") if children is None: return [] return list(children) def get_role(self, element) -> str: """获取元素角色(AXRole)。""" return self.get_attribute(element, "AXRole") or "" def get_title(self, element) -> str: """获取元素标题(AXTitle)。""" return self.get_attribute(element, "AXTitle") or "" def get_value(self, element) -> str: """获取元素值(AXValue)。""" return self.get_attribute(element, "AXValue") or "" def get_name(self, element) -> str: """获取元素名称,优先 AXTitle,其次 AXValue。""" title = self.get_attribute(element, "AXTitle") if title: return title value = self.get_attribute(element, "AXValue") if value: return value return "" def get_description(self, element) -> str: """获取元素描述(AXDescription)。""" return self.get_attribute(element, "AXDescription") or "" def get_size(self, element) -> tuple: """获取元素尺寸 (width, height)。返回 (0, 0) 表示不可见或失败。""" size = self.get_attribute(element, "AXSize") if size is None: return (0, 0) return self._extract_size(size) def get_position(self, element) -> tuple: """获取元素位置 (x, y)。""" pos = self.get_attribute(element, "AXPosition") if pos is None: return (0, 0) return self._extract_point(pos) @staticmethod def _extract_point(ax_value) -> tuple: """从 AXValue 中提取 CGPoint → (x, y)。""" try: return (int(ax_value.x), int(ax_value.y)) except Exception: pass try: m = re.search(r'x:([\d.]+)\s+y:([\d.]+)', str(ax_value)) if m: return (int(float(m.group(1))), int(float(m.group(2)))) except Exception: pass return (0, 0) @staticmethod def _extract_size(ax_value) -> tuple: """从 AXValue 中提取 CGSize → (width, height)。""" try: return (int(ax_value.width), int(ax_value.height)) except Exception: pass try: m = re.search(r'w:([\d.]+)\s+h:([\d.]+)', str(ax_value)) if m: return (int(float(m.group(1))), int(float(m.group(2)))) except Exception: pass return (0, 0) def get_windows(self, app_ref) -> list: """获取应用的所有窗口。""" windows = self.get_attribute(app_ref, "AXWindows") if windows is None: return [] return list(windows) def get_focused_window(self, app_ref): """获取当前聚焦的窗口。""" return self.get_attribute(app_ref, "AXFocusedWindow") # ---------------------------------------------------------------- # 操作执行 # ---------------------------------------------------------------- def perform_action(self, element, action: str) -> bool: """对元素执行指定操作。""" try: err = AXUIElementPerformAction(element, action) if err == kAXErrorSuccess: self._error_count = 0 return True logger.debug(f"执行操作 {action} 失败, 错误码: {err}") self._error_count += 1 return False except Exception as e: logger.debug(f"执行操作 {action} 异常: {e}") self._error_count += 1 return False def press(self, element) -> bool: """对元素执行 AXPress(等效点击)。""" return self.perform_action(element, "AXPress") def click_at_element(self, element) -> bool: """通过鼠标事件点击元素中心位置(用于不支持 AXPress 的元素)。""" pos = self.get_position(element) size = self.get_size(element) if pos == (0, 0) and size == (0, 0): logger.warning("元素位置/尺寸不可用,无法点击") return False cx = pos[0] + size[0] // 2 cy = pos[1] + size[1] // 2 role = self.get_role(element) title = (self.get_title(element) or "").replace("\n", "\\n")[:40] logger.debug( f"click_at_element: ({cx}, {cy}) role={role} title=\"{title}\" " f"pos={pos} size={size}" ) return self._mouse_click(cx, cy) def _mouse_click(self, x: int, y: int) -> bool: """在屏幕坐标 (x, y) 处执行鼠标左键点击。""" try: point = CGPointMake(float(x), float(y)) evt_down = CGEventCreateMouseEvent( None, kCGEventLeftMouseDown, point, kCGMouseButtonLeft ) CGEventPost(kCGHIDEventTap, evt_down) time.sleep(0.05) evt_up = CGEventCreateMouseEvent( None, kCGEventLeftMouseUp, point, kCGMouseButtonLeft ) CGEventPost(kCGHIDEventTap, evt_up) self._error_count = 0 return True except Exception as e: logger.error(f"鼠标点击失败 ({x}, {y}): {e}") self._error_count += 1 return False def double_click_at_element(self, element) -> bool: """在元素中心位置执行鼠标双击。""" pos = self.get_position(element) size = self.get_size(element) if pos == (0, 0) and size == (0, 0): return False cx = pos[0] + size[0] // 2 cy = pos[1] + size[1] // 2 try: point = CGPointMake(float(cx), float(cy)) for _ in range(2): evt_down = CGEventCreateMouseEvent( None, kCGEventLeftMouseDown, point, kCGMouseButtonLeft ) CGEventPost(kCGHIDEventTap, evt_down) time.sleep(0.02) evt_up = CGEventCreateMouseEvent( None, kCGEventLeftMouseUp, point, kCGMouseButtonLeft ) CGEventPost(kCGHIDEventTap, evt_up) time.sleep(0.05) self._error_count = 0 return True except Exception as e: logger.error(f"鼠标双击失败: {e}") self._error_count += 1 return False # ---------------------------------------------------------------- # 滚动事件 # ---------------------------------------------------------------- def scroll_at_element(self, element, lines: int = -5): """在元素位置执行滚轮滚动。lines 为负值表示向上滚动。""" pos = self.get_position(element) size = self.get_size(element) if pos == (0, 0) and size == (0, 0): logger.warning("元素位置不可用,无法滚动") return cx = pos[0] + size[0] // 2 cy = pos[1] + size[1] // 2 self._scroll_at(cx, cy, lines) def scroll_to_bottom(self, element, rounds: int = 10, lines_per_round: int = 20): """向下滚动多次,尽量到达元素(如消息列表)的底部。""" pos = self.get_position(element) size = self.get_size(element) if pos == (0, 0) and size == (0, 0): logger.warning("元素位置不可用,无法滚到底部") return cx = pos[0] + size[0] // 2 cy = pos[1] + size[1] // 2 logger.debug(f"scroll_to_bottom: 目标({cx}, {cy}), {rounds}轮x{lines_per_round}行") for i in range(rounds): self._scroll_at(cx, cy, lines_per_round) time.sleep(0.12) def _scroll_at(self, x: int, y: int, lines: int): """在屏幕坐标处执行滚轮滚动(仅移动鼠标,不点击)。""" try: point = CGPointMake(float(x), float(y)) # 移动鼠标到目标位置(不点击!) move_evt = CGEventCreateMouseEvent( None, kCGEventMouseMoved, point, kCGMouseButtonLeft ) CGEventPost(kCGHIDEventTap, move_evt) time.sleep(0.05) scroll_evt = CGEventCreateScrollWheelEvent( None, kCGScrollEventUnitLine, 1, lines ) CGEventPost(kCGHIDEventTap, scroll_evt) logger.debug(f"scroll_at: ({x}, {y}) lines={lines}") except Exception as e: logger.error(f"滚动失败: {e}") # ---------------------------------------------------------------- # 键盘事件 # ---------------------------------------------------------------- def send_escape_key(self): """发送 Escape 键事件。""" self._send_key(kVK_Escape) def send_cmd_w(self): """发送 Cmd+W 键事件(关闭窗口)。""" self._send_key(kVK_W, flags=kCGEventFlagMaskCommand) def _send_key(self, key_code: int, flags: int = 0): """发送键盘事件(按下+抬起)。""" try: # Key down event_down = CGEventCreateKeyboardEvent(None, key_code, True) if flags: CGEventSetFlags(event_down, flags) CGEventPost(kCGHIDEventTap, event_down) time.sleep(0.05) # Key up event_up = CGEventCreateKeyboardEvent(None, key_code, False) if flags: CGEventSetFlags(event_up, flags) CGEventPost(kCGHIDEventTap, event_up) except Exception as e: logger.error(f"发送键盘事件失败: {e}") # ---------------------------------------------------------------- # 错误管理 # ---------------------------------------------------------------- def should_backoff(self) -> bool: """连续错误过多时应暂停一段时间。""" return self._error_count >= self._max_errors_before_backoff def reset_error_count(self): """重置错误计数。""" self._error_count = 0 # ---------------------------------------------------------------- # 调试工具 # ---------------------------------------------------------------- def dump_element(self, element, indent: int = 0, max_depth: int = 5) -> str: """递归输出元素树(调试用)。""" if indent >= max_depth: return "" lines = [] role = self.get_role(element) title = self.get_title(element) name_attr = self.get_attribute(element, "AXValue") or "" desc = self.get_description(element) size = self.get_size(element) prefix = " " * indent info_parts = [f"role={role}"] if title: # 截断过长的 title,用单行表示 short_title = title.replace("\n", "\\n")[:80] info_parts.append(f'title="{short_title}"') if name_attr: short_name = str(name_attr).replace("\n", "\\n")[:80] info_parts.append(f'value="{short_name}"') if desc: info_parts.append(f'desc="{desc}"') if size != (0, 0): info_parts.append(f"size={size[0]}x{size[1]}") lines.append(f"{prefix}[{', '.join(info_parts)}]") children = self.get_children(element) for child in children: lines.append(self.dump_element(child, indent + 1, max_depth)) return "\n".join(lines)