wechat_msg_clicker/wechat_clicker/ax_bridge.py
2026-04-23 19:23:59 +08:00

531 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""macOS Accessibility API (AXUIElement) 底层封装
通过 pyobjc 调用 ApplicationServices 框架,提供对 UI 元素的读取和操作能力。
"""
import logging
import re
import time
from ApplicationServices import (
AXIsProcessTrusted,
AXIsProcessTrustedWithOptions,
AXUIElementCreateApplication,
AXUIElementCopyAttributeValue,
AXUIElementCopyAttributeNames,
AXUIElementCopyActionNames,
AXUIElementPerformAction,
AXUIElementSetAttributeValue,
)
from Cocoa import (
NSRunningApplication,
NSWorkspace,
NSApplicationActivateIgnoringOtherApps,
)
from Quartz import (
CGEventCreateKeyboardEvent,
CGEventCreateMouseEvent,
CGEventCreateScrollWheelEvent,
CGEventPost,
CGEventSetFlags,
kCGHIDEventTap,
kCGEventFlagMaskCommand,
kCGEventLeftMouseDown,
kCGEventLeftMouseUp,
kCGEventMouseMoved,
kCGMouseButtonLeft,
kCGScrollEventUnitLine,
)
from CoreFoundation import kCFBooleanTrue
from Quartz import CGPointMake
logger = logging.getLogger("wechat_clicker.ax_bridge")
# AXError 常量
kAXErrorSuccess = 0
kAXErrorAttributeUnsupported = -25205
kAXErrorNoValue = -25212
kAXErrorInvalidUIElement = -25202
# 键码
kVK_Escape = 0x35
kVK_W = 0x0D
class AXBridge:
"""AXUIElement 底层封装"""
def __init__(self):
self._error_count = 0
self._max_errors_before_backoff = 10
self._backoff_seconds = 60
# ----------------------------------------------------------------
# 辅助功能权限检查
# ----------------------------------------------------------------
def check_accessibility(self) -> bool:
"""检查当前进程是否拥有辅助功能权限。"""
trusted = AXIsProcessTrusted()
if not trusted:
logger.error(
"辅助功能权限未授予!请前往:系统设置 > 隐私与安全 > 辅助功能,"
"添加当前终端应用或 Python 解释器。"
)
# 弹出系统提示框
AXIsProcessTrustedWithOptions(
{
"AXTrustedCheckOptionPrompt": kCFBooleanTrue
}
)
return trusted
# ----------------------------------------------------------------
# 获取应用引用
# ----------------------------------------------------------------
def get_app_ref(self, bundle_id: str):
"""根据 bundle ID 获取应用的 AXUIElement 引用。返回 None 表示应用未运行。"""
apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id)
if not apps or len(apps) == 0:
logger.error(f"未找到运行中的应用: {bundle_id}")
return None
app = apps[0]
pid = app.processIdentifier()
logger.debug(f"找到应用 {bundle_id}, PID={pid}")
return AXUIElementCreateApplication(pid)
def bring_to_front(self, bundle_id: str) -> bool:
"""将指定应用带到最前台。"""
apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id)
if not apps or len(apps) == 0:
return False
app = apps[0]
return app.activateWithOptions_(NSApplicationActivateIgnoringOtherApps)
def is_app_running(self, bundle_id: str) -> bool:
"""检查应用是否正在运行(不依赖窗口状态)。"""
apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id)
return apps is not None and len(apps) > 0
def unminimize_app(self, bundle_id: str) -> bool:
"""取消应用窗口的最小化/隐藏状态,多策略依次尝试。"""
apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id)
if not apps or len(apps) == 0:
logger.error(f"未找到运行中的应用: {bundle_id}")
return False
app = apps[0]
app_name = app.localizedName() or "WeChat"
# 策略1: NSRunningApplication unhide处理 Cmd+H 隐藏)
if app.isHidden():
logger.info(f"{app_name} 处于隐藏状态,调用 unhide")
app.unhide()
time.sleep(0.3)
# 策略2: 通过 AX API 取消所有窗口的最小化(最可靠的方式)
app_ref = self.get_app_ref(bundle_id)
if app_ref:
windows = self.get_windows(app_ref)
for win in windows:
if self.is_window_minimized(win):
title = self.get_title(win) or "?"
logger.info(f"通过 AX API 取消窗口最小化: {title}")
self.set_window_minimized(win, False)
time.sleep(0.5)
# 策略3: activate 带到前台
app.activateWithOptions_(NSApplicationActivateIgnoringOtherApps)
time.sleep(0.3)
logger.info(f"窗口恢复流程完成: {app_name}")
return True
def is_window_minimized(self, window) -> bool:
"""检查窗口是否处于最小化状态。"""
try:
err, value = AXUIElementCopyAttributeValue(window, "AXMinimized", None)
if err == kAXErrorSuccess:
return bool(value)
except Exception:
pass
return False
def set_window_minimized(self, window, minimized: bool) -> bool:
"""通过 AX API 设置窗口最小化状态。"""
try:
err = AXUIElementSetAttributeValue(window, "AXMinimized", minimized)
return err == kAXErrorSuccess
except Exception as e:
logger.error(f"设置 AXMinimized 失败: {e}")
return False
# ----------------------------------------------------------------
# 属性读取
# ----------------------------------------------------------------
def get_attribute(self, element, attr_name: str):
"""读取 UI 元素的指定属性。失败返回 None。"""
try:
err, value = AXUIElementCopyAttributeValue(element, attr_name, None)
if err == kAXErrorSuccess:
self._error_count = 0
return value
if err not in (kAXErrorNoValue, kAXErrorAttributeUnsupported):
logger.debug(f"读取属性 {attr_name} 失败, 错误码: {err}")
return None
except Exception as e:
logger.debug(f"读取属性 {attr_name} 异常: {e}")
return None
def get_attribute_names(self, element) -> list:
"""获取元素支持的所有属性名。"""
try:
err, names = AXUIElementCopyAttributeNames(element, None)
if err == kAXErrorSuccess:
return list(names) if names else []
return []
except Exception:
return []
def get_action_names(self, element) -> list:
"""获取元素支持的所有操作名。"""
try:
err, names = AXUIElementCopyActionNames(element, None)
if err == kAXErrorSuccess:
return list(names) if names else []
return []
except Exception:
return []
def get_children(self, element) -> list:
"""获取子元素列表。"""
children = self.get_attribute(element, "AXChildren")
if children is None:
return []
return list(children)
def get_role(self, element) -> str:
"""获取元素角色AXRole"""
return self.get_attribute(element, "AXRole") or ""
def get_title(self, element) -> str:
"""获取元素标题AXTitle"""
return self.get_attribute(element, "AXTitle") or ""
def get_value(self, element) -> str:
"""获取元素值AXValue"""
return self.get_attribute(element, "AXValue") or ""
def get_name(self, element) -> str:
"""获取元素名称,优先 AXTitle其次 AXValue。"""
title = self.get_attribute(element, "AXTitle")
if title:
return title
value = self.get_attribute(element, "AXValue")
if value:
return value
return ""
def get_description(self, element) -> str:
"""获取元素描述AXDescription"""
return self.get_attribute(element, "AXDescription") or ""
def get_size(self, element) -> tuple:
"""获取元素尺寸 (width, height)。返回 (0, 0) 表示不可见或失败。"""
size = self.get_attribute(element, "AXSize")
if size is None:
return (0, 0)
return self._extract_size(size)
def get_position(self, element) -> tuple:
"""获取元素位置 (x, y)。"""
pos = self.get_attribute(element, "AXPosition")
if pos is None:
return (0, 0)
return self._extract_point(pos)
@staticmethod
def _extract_point(ax_value) -> tuple:
"""从 AXValue 中提取 CGPoint → (x, y)。"""
try:
return (int(ax_value.x), int(ax_value.y))
except Exception:
pass
try:
m = re.search(r'x:([\d.]+)\s+y:([\d.]+)', str(ax_value))
if m:
return (int(float(m.group(1))), int(float(m.group(2))))
except Exception:
pass
return (0, 0)
@staticmethod
def _extract_size(ax_value) -> tuple:
"""从 AXValue 中提取 CGSize → (width, height)。"""
try:
return (int(ax_value.width), int(ax_value.height))
except Exception:
pass
try:
m = re.search(r'w:([\d.]+)\s+h:([\d.]+)', str(ax_value))
if m:
return (int(float(m.group(1))), int(float(m.group(2))))
except Exception:
pass
return (0, 0)
def get_windows(self, app_ref) -> list:
"""获取应用的所有窗口。"""
windows = self.get_attribute(app_ref, "AXWindows")
if windows is None:
return []
return list(windows)
def get_focused_window(self, app_ref):
"""获取当前聚焦的窗口。"""
return self.get_attribute(app_ref, "AXFocusedWindow")
# ----------------------------------------------------------------
# 操作执行
# ----------------------------------------------------------------
def perform_action(self, element, action: str) -> bool:
"""对元素执行指定操作。"""
try:
err = AXUIElementPerformAction(element, action)
if err == kAXErrorSuccess:
self._error_count = 0
return True
logger.debug(f"执行操作 {action} 失败, 错误码: {err}")
self._error_count += 1
return False
except Exception as e:
logger.debug(f"执行操作 {action} 异常: {e}")
self._error_count += 1
return False
def press(self, element) -> bool:
"""对元素执行 AXPress等效点击"""
return self.perform_action(element, "AXPress")
def click_at_element(self, element) -> bool:
"""通过鼠标事件点击元素中心位置(用于不支持 AXPress 的元素)。"""
pos = self.get_position(element)
size = self.get_size(element)
if pos == (0, 0) and size == (0, 0):
logger.warning("元素位置/尺寸不可用,无法点击")
return False
cx = pos[0] + size[0] // 2
cy = pos[1] + size[1] // 2
role = self.get_role(element)
title = (self.get_title(element) or "").replace("\n", "\\n")[:40]
logger.debug(
f"click_at_element: ({cx}, {cy}) role={role} title=\"{title}\" "
f"pos={pos} size={size}"
)
return self._mouse_click(cx, cy)
def click_at_element_offset(self, element, x_offset: int, y_offset: int) -> bool:
"""通过鼠标事件点击元素指定偏移位置。"""
pos = self.get_position(element)
size = self.get_size(element)
if pos == (0, 0) and size == (0, 0):
logger.warning("元素位置/尺寸不可用,无法点击")
return False
cx = pos[0] + x_offset
cy = pos[1] + y_offset
role = self.get_role(element)
title = (self.get_title(element) or "").replace("\n", "\\n")[:40]
logger.debug(
f"click_at_element_offset: ({cx}, {cy}) offset=({x_offset},{y_offset}) "
f"role={role} title=\"{title}\" pos={pos} size={size}"
)
return self._mouse_click(cx, cy)
def _mouse_click(self, x: int, y: int) -> bool:
"""在屏幕坐标 (x, y) 处执行鼠标左键点击。"""
try:
point = CGPointMake(float(x), float(y))
evt_down = CGEventCreateMouseEvent(
None, kCGEventLeftMouseDown, point, kCGMouseButtonLeft
)
CGEventPost(kCGHIDEventTap, evt_down)
time.sleep(0.05)
evt_up = CGEventCreateMouseEvent(
None, kCGEventLeftMouseUp, point, kCGMouseButtonLeft
)
CGEventPost(kCGHIDEventTap, evt_up)
self._error_count = 0
return True
except Exception as e:
logger.error(f"鼠标点击失败 ({x}, {y}): {e}")
self._error_count += 1
return False
def double_click_at_element(self, element) -> bool:
"""在元素中心位置执行鼠标双击。"""
pos = self.get_position(element)
size = self.get_size(element)
if pos == (0, 0) and size == (0, 0):
return False
cx = pos[0] + size[0] // 2
cy = pos[1] + size[1] // 2
try:
point = CGPointMake(float(cx), float(cy))
for _ in range(2):
evt_down = CGEventCreateMouseEvent(
None, kCGEventLeftMouseDown, point, kCGMouseButtonLeft
)
CGEventPost(kCGHIDEventTap, evt_down)
time.sleep(0.02)
evt_up = CGEventCreateMouseEvent(
None, kCGEventLeftMouseUp, point, kCGMouseButtonLeft
)
CGEventPost(kCGHIDEventTap, evt_up)
time.sleep(0.05)
self._error_count = 0
return True
except Exception as e:
logger.error(f"鼠标双击失败: {e}")
self._error_count += 1
return False
# ----------------------------------------------------------------
# 滚动事件
# ----------------------------------------------------------------
def scroll_at_element(self, element, lines: int = -5):
"""在元素位置执行滚轮滚动。lines 为负值表示向上滚动。"""
pos = self.get_position(element)
size = self.get_size(element)
if pos == (0, 0) and size == (0, 0):
logger.warning("元素位置不可用,无法滚动")
return
cx = pos[0] + size[0] // 2
cy = pos[1] + size[1] // 2
self._scroll_at(cx, cy, lines)
def scroll_to_bottom(self, element, rounds: int = 10, lines_per_round: int = -20):
"""向下滚动多次,尽量到达元素(如消息列表)的底部。负值=向下滚动。"""
pos = self.get_position(element)
size = self.get_size(element)
if pos == (0, 0) and size == (0, 0):
logger.warning("元素位置不可用,无法滚到底部")
return
cx = pos[0] + size[0] // 2
cy = pos[1] + size[1] // 2
logger.debug(f"scroll_to_bottom: 目标({cx}, {cy}), {rounds}轮x{lines_per_round}")
for i in range(rounds):
self._scroll_at(cx, cy, lines_per_round)
time.sleep(0.12)
def _scroll_at(self, x: int, y: int, lines: int):
"""在屏幕坐标处执行滚轮滚动(仅移动鼠标,不点击)。"""
try:
point = CGPointMake(float(x), float(y))
# 移动鼠标到目标位置(不点击!)
move_evt = CGEventCreateMouseEvent(
None, kCGEventMouseMoved, point, kCGMouseButtonLeft
)
CGEventPost(kCGHIDEventTap, move_evt)
time.sleep(0.05)
scroll_evt = CGEventCreateScrollWheelEvent(
None, kCGScrollEventUnitLine, 1, lines
)
CGEventPost(kCGHIDEventTap, scroll_evt)
logger.debug(f"scroll_at: ({x}, {y}) lines={lines}")
except Exception as e:
logger.error(f"滚动失败: {e}")
# ----------------------------------------------------------------
# 键盘事件
# ----------------------------------------------------------------
def send_escape_key(self):
"""发送 Escape 键事件。"""
self._send_key(kVK_Escape)
def send_cmd_w(self):
"""发送 Cmd+W 键事件(关闭窗口)。"""
self._send_key(kVK_W, flags=kCGEventFlagMaskCommand)
def _send_key(self, key_code: int, flags: int = 0):
"""发送键盘事件(按下+抬起)。"""
try:
# Key down
event_down = CGEventCreateKeyboardEvent(None, key_code, True)
if flags:
CGEventSetFlags(event_down, flags)
CGEventPost(kCGHIDEventTap, event_down)
time.sleep(0.05)
# Key up
event_up = CGEventCreateKeyboardEvent(None, key_code, False)
if flags:
CGEventSetFlags(event_up, flags)
CGEventPost(kCGHIDEventTap, event_up)
except Exception as e:
logger.error(f"发送键盘事件失败: {e}")
# ----------------------------------------------------------------
# 错误管理
# ----------------------------------------------------------------
def should_backoff(self) -> bool:
"""连续错误过多时应暂停一段时间。"""
return self._error_count >= self._max_errors_before_backoff
def reset_error_count(self):
"""重置错误计数。"""
self._error_count = 0
# ----------------------------------------------------------------
# 调试工具
# ----------------------------------------------------------------
def dump_element(self, element, indent: int = 0, max_depth: int = 5) -> str:
"""递归输出元素树(调试用)。"""
if indent >= max_depth:
return ""
lines = []
role = self.get_role(element)
title = self.get_title(element)
name_attr = self.get_attribute(element, "AXValue") or ""
desc = self.get_description(element)
size = self.get_size(element)
pos = self.get_position(element)
actions = self.get_action_names(element)
identifier = self.get_attribute(element, "AXIdentifier") or ""
prefix = " " * indent
info_parts = [f"role={role}"]
if title:
short_title = title.replace("\n", "\\n")[:80]
info_parts.append(f'title="{short_title}"')
if name_attr:
short_name = str(name_attr).replace("\n", "\\n")[:80]
info_parts.append(f'value="{short_name}"')
if desc:
info_parts.append(f'desc="{desc}"')
if size != (0, 0):
info_parts.append(f"size={size[0]}x{size[1]}")
if pos != (0, 0):
info_parts.append(f"pos=({pos[0]},{pos[1]})")
if actions:
info_parts.append(f"actions={actions}")
if identifier:
info_parts.append(f'id="{identifier}"')
lines.append(f"{prefix}[{', '.join(info_parts)}]")
children = self.get_children(element)
for child in children:
lines.append(self.dump_element(child, indent + 1, max_depth))
return "\n".join(lines)