588 lines
22 KiB
Python
588 lines
22 KiB
Python
"""微信桌面端 UI 导航与元素查找
|
||
|
||
基于 macOS Accessibility API 探测到的微信 v4.1.9 UI 结构:
|
||
- 主窗口 "微信" 包含侧边栏和聊天列表
|
||
- 点击聊天项会打开独立的会话窗口(以聊天名命名)
|
||
- 聊天列表:AXList name="会话"
|
||
- 消息列表:AXList name="消息"
|
||
- 每个聊天项的 title 是多行文本,包含聊天名、未读数、最近消息、时间
|
||
"""
|
||
|
||
import logging
|
||
import re
|
||
import time
|
||
|
||
from .ax_bridge import AXBridge
|
||
|
||
logger = logging.getLogger("wechat_clicker.wechat_ui")
|
||
|
||
|
||
class ChatItem:
|
||
"""解析后的聊天列表项"""
|
||
|
||
def __init__(self, element, name: str, unread_count: int, preview: str, timestamp: str):
|
||
self.element = element
|
||
self.name = name
|
||
self.unread_count = unread_count
|
||
self.preview = preview
|
||
self.timestamp = timestamp
|
||
|
||
def __repr__(self):
|
||
return f"ChatItem(name={self.name!r}, unread={self.unread_count})"
|
||
|
||
|
||
class MessageItem:
|
||
"""解析后的消息"""
|
||
|
||
def __init__(self, element, msg_type: str, title: str, size: tuple):
|
||
self.element = element
|
||
self.msg_type = msg_type # "image", "file", "video", "text", "timestamp", "other"
|
||
self.title = title
|
||
self.size = size
|
||
|
||
@property
|
||
def is_visible(self) -> bool:
|
||
"""判断消息是否在可见区域(不可见的消息 size 为 0x0)"""
|
||
return self.size[0] > 0 and self.size[1] > 0
|
||
|
||
def __repr__(self):
|
||
short = self.title.replace("\n", "\\n")[:40]
|
||
return f"MessageItem(type={self.msg_type}, title={short!r})"
|
||
|
||
|
||
class WeChatUI:
|
||
"""微信 UI 导航器"""
|
||
|
||
def __init__(self, ax: AXBridge, bundle_id: str = "com.tencent.xinWeChat"):
|
||
self.ax = ax
|
||
self.bundle_id = bundle_id
|
||
self._app_ref = None
|
||
|
||
# ----------------------------------------------------------------
|
||
# 应用引用管理
|
||
# ----------------------------------------------------------------
|
||
|
||
def get_app_ref(self):
|
||
"""获取或缓存微信应用的 AXUIElement 引用。"""
|
||
if self._app_ref is None:
|
||
self._app_ref = self.ax.get_app_ref(self.bundle_id)
|
||
return self._app_ref
|
||
|
||
def invalidate_app_ref(self):
|
||
"""清除缓存的应用引用(微信重启后需要)。"""
|
||
self._app_ref = None
|
||
|
||
def ensure_wechat_frontmost(self) -> bool:
|
||
"""确保微信在最前台。"""
|
||
return self.ax.bring_to_front(self.bundle_id)
|
||
|
||
def ensure_wechat_visible(self, max_retries: int = 3) -> bool:
|
||
"""确保微信窗口可见(取消最小化/隐藏并带到前台)。
|
||
|
||
多次重试,每次重试后验证主窗口是否真正可见(非最小化)。
|
||
"""
|
||
if not self.ax.is_app_running(self.bundle_id):
|
||
logger.error("微信未运行")
|
||
return False
|
||
|
||
# 先检查是否已经可见且非最小化
|
||
main_win = self.get_main_window()
|
||
if main_win is not None and not self.ax.is_window_minimized(main_win):
|
||
self.ensure_wechat_frontmost()
|
||
return True
|
||
|
||
for attempt in range(1, max_retries + 1):
|
||
logger.info(f"微信窗口不可见或已最小化,恢复尝试 {attempt}/{max_retries}...")
|
||
|
||
success = self.ax.unminimize_app(self.bundle_id)
|
||
if not success:
|
||
logger.warning(f"恢复尝试 {attempt}: unminimize_app 返回失败")
|
||
time.sleep(1.0)
|
||
continue
|
||
|
||
time.sleep(1.0)
|
||
|
||
# 清除缓存的 app_ref,强制重新获取
|
||
self.invalidate_app_ref()
|
||
|
||
main_win = self.get_main_window()
|
||
if main_win is not None and not self.ax.is_window_minimized(main_win):
|
||
# 二次确认:等 0.5s 后再检查,防止瞬时窗口
|
||
time.sleep(0.5)
|
||
self.invalidate_app_ref()
|
||
main_win_verify = self.get_main_window()
|
||
if main_win_verify is not None and not self.ax.is_window_minimized(main_win_verify):
|
||
logger.info("微信主窗口已恢复可见")
|
||
self.ensure_wechat_frontmost()
|
||
return True
|
||
else:
|
||
logger.warning("微信主窗口短暂出现后消失(可能窗口尚未稳定)")
|
||
|
||
logger.warning(f"恢复尝试 {attempt} 后窗口仍不可见,等待后重试...")
|
||
time.sleep(1.0)
|
||
|
||
logger.error(f"经过 {max_retries} 次尝试,微信窗口恢复失败")
|
||
return False
|
||
|
||
# ----------------------------------------------------------------
|
||
# 窗口查找
|
||
# ----------------------------------------------------------------
|
||
|
||
def get_all_windows(self) -> list:
|
||
"""获取微信所有窗口。"""
|
||
app_ref = self.get_app_ref()
|
||
if app_ref is None:
|
||
return []
|
||
return self.ax.get_windows(app_ref)
|
||
|
||
def get_main_window(self):
|
||
"""找到主窗口(名为"微信"且最大的那个)。"""
|
||
all_windows = self.get_all_windows()
|
||
candidates = []
|
||
for win in all_windows:
|
||
title = self.ax.get_title(win)
|
||
if title == "微信":
|
||
size = self.ax.get_size(win)
|
||
candidates.append((win, size[0] * size[1]))
|
||
if candidates:
|
||
candidates.sort(key=lambda x: x[1], reverse=True)
|
||
return candidates[0][0]
|
||
if all_windows:
|
||
found_titles = [self.ax.get_title(w) or "(空标题)" for w in all_windows]
|
||
logger.warning(f"未找到微信主窗口, 当前窗口标题: {found_titles}")
|
||
else:
|
||
logger.warning("未找到微信主窗口")
|
||
return None
|
||
|
||
def get_conversation_windows(self) -> list:
|
||
"""获取所有非主窗口(即会话窗口)。"""
|
||
result = []
|
||
for win in self.get_all_windows():
|
||
title = self.ax.get_title(win)
|
||
# 排除主窗口和一些系统窗口
|
||
if title and title != "微信":
|
||
result.append(win)
|
||
return result
|
||
|
||
def get_conversation_window(self):
|
||
"""获取第一个会话窗口(如果存在)。"""
|
||
conv_windows = self.get_conversation_windows()
|
||
return conv_windows[0] if conv_windows else None
|
||
|
||
# ----------------------------------------------------------------
|
||
# 元素查找工具
|
||
# ----------------------------------------------------------------
|
||
|
||
def _find_child_by_role_and_name(self, parent, role: str, name: str = None, desc: str = None):
|
||
"""在子元素中按 role + name/desc 查找。"""
|
||
children = self.ax.get_children(parent)
|
||
for child in children:
|
||
child_role = self.ax.get_role(child)
|
||
if child_role != role:
|
||
continue
|
||
if name is not None:
|
||
child_title = self.ax.get_title(child)
|
||
if child_title == name:
|
||
return child
|
||
# 也检查 AXDescription
|
||
child_desc = self.ax.get_description(child)
|
||
if child_desc == name:
|
||
return child
|
||
elif desc is not None:
|
||
child_desc = self.ax.get_description(child)
|
||
if child_desc == desc:
|
||
return child
|
||
else:
|
||
# 只匹配 role
|
||
return child
|
||
return None
|
||
|
||
def _find_child_recursive(self, parent, role: str, name: str = None, max_depth: int = 6):
|
||
"""递归查找子元素(广度优先)。"""
|
||
if max_depth <= 0:
|
||
return None
|
||
|
||
children = self.ax.get_children(parent)
|
||
# 先在当前层查找
|
||
for child in children:
|
||
child_role = self.ax.get_role(child)
|
||
if child_role == role:
|
||
if name is None:
|
||
return child
|
||
child_title = self.ax.get_title(child)
|
||
if child_title == name:
|
||
return child
|
||
# 也用 AXDescription 匹配
|
||
child_desc = self.ax.get_description(child)
|
||
if child_desc == name:
|
||
return child
|
||
|
||
# 递归到下一层
|
||
for child in children:
|
||
result = self._find_child_recursive(child, role, name, max_depth - 1)
|
||
if result is not None:
|
||
return result
|
||
return None
|
||
|
||
# ----------------------------------------------------------------
|
||
# 聊天列表
|
||
# ----------------------------------------------------------------
|
||
|
||
def get_chat_list(self):
|
||
"""找到聊天列表元素(AXList name="会话")。"""
|
||
main_win = self.get_main_window()
|
||
if main_win is None:
|
||
return None
|
||
return self._find_child_recursive(main_win, "AXList", "会话")
|
||
|
||
def get_sidebar_chat_button(self):
|
||
"""找到侧边栏的"微信"按钮(显示全局未读数)。"""
|
||
main_win = self.get_main_window()
|
||
if main_win is None:
|
||
return None
|
||
return self._find_child_recursive(main_win, "AXButton", "微信")
|
||
|
||
def get_global_unread_count(self) -> int:
|
||
"""从侧边栏"微信"按钮获取全局未读消息数。"""
|
||
btn = self.get_sidebar_chat_button()
|
||
if btn is None:
|
||
return 0
|
||
desc = self.ax.get_description(btn)
|
||
if not desc:
|
||
return 0
|
||
match = re.search(r"(\d+)条新消息", desc)
|
||
if match:
|
||
return int(match.group(1))
|
||
return 0
|
||
|
||
def get_chat_items(self) -> list:
|
||
"""获取聊天列表中所有可见聊天项,返回 ChatItem 列表。"""
|
||
chat_list = self.get_chat_list()
|
||
if chat_list is None:
|
||
logger.warning("未找到聊天列表")
|
||
return []
|
||
|
||
items = []
|
||
children = self.ax.get_children(chat_list)
|
||
for child in children:
|
||
role = self.ax.get_role(child)
|
||
if role != "AXStaticText":
|
||
continue
|
||
|
||
title = self.ax.get_title(child)
|
||
if not title:
|
||
continue
|
||
|
||
parsed = self._parse_chat_title(title)
|
||
if parsed["name"]:
|
||
items.append(ChatItem(
|
||
element=child,
|
||
name=parsed["name"],
|
||
unread_count=parsed["unread_count"],
|
||
preview=parsed["preview"],
|
||
timestamp=parsed["timestamp"],
|
||
))
|
||
|
||
return items
|
||
|
||
def get_unread_chats(self) -> list:
|
||
"""获取有未读消息的聊天项,按未读数降序排列。"""
|
||
items = self.get_chat_items()
|
||
unread = [item for item in items if item.unread_count > 0]
|
||
unread.sort(key=lambda x: x.unread_count, reverse=True)
|
||
return unread
|
||
|
||
# ----------------------------------------------------------------
|
||
# 会话消息
|
||
# ----------------------------------------------------------------
|
||
|
||
def get_message_list(self, conv_window):
|
||
"""在会话窗口中找到消息列表(AXList name="消息")。"""
|
||
if conv_window is None:
|
||
return None
|
||
return self._find_child_recursive(conv_window, "AXList", "消息", max_depth=12)
|
||
|
||
def get_messages(self, msg_list) -> list:
|
||
"""获取消息列表中的所有消息,返回 MessageItem 列表。"""
|
||
if msg_list is None:
|
||
return []
|
||
|
||
items = []
|
||
children = self.ax.get_children(msg_list)
|
||
for child in children:
|
||
role = self.ax.get_role(child)
|
||
if role != "AXStaticText":
|
||
continue
|
||
|
||
title = self.ax.get_title(child)
|
||
if not title:
|
||
continue
|
||
|
||
size = self.ax.get_size(child)
|
||
msg_type = self._classify_message(title, size)
|
||
items.append(MessageItem(
|
||
element=child,
|
||
msg_type=msg_type,
|
||
title=title,
|
||
size=size,
|
||
))
|
||
|
||
return items
|
||
|
||
def get_media_messages(self, conv_window) -> list:
|
||
"""获取会话中所有可见的图片/文件/视频消息。"""
|
||
msg_list = self.get_message_list(conv_window)
|
||
if msg_list is None:
|
||
logger.debug("get_media_messages: 未找到消息列表")
|
||
return []
|
||
|
||
messages = self.get_messages(msg_list)
|
||
logger.debug(f"get_media_messages: 消息列表中共 {len(messages)} 个元素")
|
||
|
||
media = []
|
||
for msg in messages:
|
||
if msg.msg_type in ("image", "file", "video") and msg.is_visible:
|
||
pos = self.ax.get_position(msg.element)
|
||
short_title = msg.title.replace("\n", "\\n")[:50]
|
||
logger.debug(
|
||
f" 媒体: type={msg.msg_type} title=\"{short_title}\" "
|
||
f"pos={pos} size={msg.size}"
|
||
)
|
||
media.append(msg)
|
||
|
||
return media
|
||
|
||
# ----------------------------------------------------------------
|
||
# 窗口操作
|
||
# ----------------------------------------------------------------
|
||
|
||
def get_close_button(self, window):
|
||
"""找到窗口的关闭按钮(AXButton desc="关闭按钮")。"""
|
||
if window is None:
|
||
return None
|
||
|
||
children = self.ax.get_children(window)
|
||
for child in children:
|
||
role = self.ax.get_role(child)
|
||
if role == "AXButton":
|
||
desc = self.ax.get_description(child)
|
||
if desc == "关闭按钮":
|
||
return child
|
||
return None
|
||
|
||
def click_sidebar_chat_tab(self) -> bool:
|
||
"""点击侧边栏"微信"按钮,确保显示聊天列表。"""
|
||
btn = self.get_sidebar_chat_button()
|
||
if btn is None:
|
||
logger.warning("未找到侧边栏微信按钮")
|
||
return False
|
||
return self.ax.press(btn)
|
||
|
||
# ----------------------------------------------------------------
|
||
# 图片预览界面操作
|
||
# ----------------------------------------------------------------
|
||
|
||
def find_preview_more_button(self, window, min_x: int = 0) -> object:
|
||
"""在图片预览窗口中找到 '...'(更多)按钮。
|
||
|
||
min_x: 按钮的最小 x 坐标,用于排除侧边栏区域的按钮。
|
||
"""
|
||
if window is None:
|
||
return None
|
||
return self._find_more_button_recursive(window, max_depth=6, min_x=min_x)
|
||
|
||
def _find_more_button_recursive(self, parent, max_depth=6, min_x: int = 0):
|
||
"""递归查找"更多"按钮 — 通常 title 或 desc 包含 '更多' 或 '...'。"""
|
||
if max_depth <= 0:
|
||
return None
|
||
children = self.ax.get_children(parent)
|
||
for child in children:
|
||
role = self.ax.get_role(child)
|
||
if role == "AXButton":
|
||
title = self.ax.get_title(child)
|
||
desc = self.ax.get_description(child)
|
||
ident = self.ax.get_attribute(child, "AXIdentifier") or ""
|
||
combined = f"{title}|{desc}|{ident}".lower()
|
||
if any(kw in combined for kw in ["更多", "more", "..."]):
|
||
# 位置过滤:排除侧边栏区域的按钮
|
||
if min_x > 0:
|
||
pos = self.ax.get_position(child)
|
||
if pos[0] < min_x:
|
||
logger.debug(
|
||
f" 跳过侧边栏按钮: title=\"{title}\" desc=\"{desc}\" "
|
||
f"pos={pos} (x < {min_x})"
|
||
)
|
||
continue
|
||
pos = self.ax.get_position(child)
|
||
logger.debug(
|
||
f" 匹配到'...'按钮: title=\"{title}\" desc=\"{desc}\" "
|
||
f"ident=\"{ident}\" pos={pos}"
|
||
)
|
||
return child
|
||
for child in children:
|
||
result = self._find_more_button_recursive(child, max_depth - 1, min_x=min_x)
|
||
if result is not None:
|
||
return result
|
||
return None
|
||
|
||
def find_menu_item(self, text: str) -> object:
|
||
"""在当前可见的菜单/弹出层中查找包含指定文本的菜单项。"""
|
||
logger.debug(f"find_menu_item: 搜索 \"{text}\"")
|
||
app_ref = self.get_app_ref()
|
||
if app_ref is None:
|
||
return None
|
||
windows = self.ax.get_windows(app_ref)
|
||
for win in windows:
|
||
win_title = self.ax.get_title(win)
|
||
result = self._find_element_with_text(win, text, max_depth=8)
|
||
if result is not None:
|
||
role = self.ax.get_role(result)
|
||
pos = self.ax.get_position(result)
|
||
logger.debug(
|
||
f" 找到菜单项: \"{text}\" in window=\"{win_title}\" "
|
||
f"role={role} pos={pos}"
|
||
)
|
||
return result
|
||
menu_bar = self.ax.get_attribute(app_ref, "AXMenuBar")
|
||
if menu_bar:
|
||
result = self._find_element_with_text(menu_bar, text, max_depth=6)
|
||
if result is not None:
|
||
logger.debug(f" 找到菜单项: \"{text}\" in menubar")
|
||
return result
|
||
logger.debug(f" 未找到菜单项: \"{text}\"")
|
||
return None
|
||
|
||
def _find_element_with_text(self, parent, text: str, max_depth: int = 8):
|
||
"""递归查找 title 或 value 包含指定文本的可点击元素。"""
|
||
if max_depth <= 0:
|
||
return None
|
||
children = self.ax.get_children(parent)
|
||
for child in children:
|
||
title = self.ax.get_title(child)
|
||
value = self.ax.get_value(child)
|
||
role = self.ax.get_role(child)
|
||
if text in (title or "") or text in (value or ""):
|
||
actions = self.ax.get_action_names(child)
|
||
if "AXPress" in actions or "AXPick" in actions:
|
||
return child
|
||
if role in ("AXMenuItem", "AXButton", "AXStaticText"):
|
||
return child
|
||
for child in children:
|
||
result = self._find_element_with_text(child, text, max_depth - 1)
|
||
if result is not None:
|
||
return result
|
||
return None
|
||
|
||
def find_all_buttons_in_window(self, window) -> list:
|
||
"""调试用:列出窗口内所有按钮信息。"""
|
||
buttons = []
|
||
self._collect_buttons(window, buttons, max_depth=8)
|
||
return buttons
|
||
|
||
def _collect_buttons(self, parent, results: list, max_depth: int):
|
||
if max_depth <= 0:
|
||
return
|
||
children = self.ax.get_children(parent)
|
||
for child in children:
|
||
role = self.ax.get_role(child)
|
||
if role in ("AXButton", "AXMenuItem", "AXMenuBarItem"):
|
||
title = self.ax.get_title(child)
|
||
desc = self.ax.get_description(child)
|
||
ident = self.ax.get_attribute(child, "AXIdentifier") or ""
|
||
pos = self.ax.get_position(child)
|
||
size = self.ax.get_size(child)
|
||
results.append({
|
||
"element": child,
|
||
"role": role,
|
||
"title": title,
|
||
"desc": desc,
|
||
"identifier": ident,
|
||
"pos": pos,
|
||
"size": size,
|
||
})
|
||
self._collect_buttons(child, results, max_depth - 1)
|
||
|
||
# ----------------------------------------------------------------
|
||
# 解析工具
|
||
# ----------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _parse_chat_title(title_text: str) -> dict:
|
||
"""解析聊天项的多行 title 文本。
|
||
|
||
格式(有未读时):
|
||
聊天名
|
||
[N条]
|
||
最近消息预览
|
||
时间
|
||
|
||
格式(无未读时):
|
||
聊天名
|
||
最近消息预览
|
||
时间
|
||
"""
|
||
lines = title_text.strip().split("\n")
|
||
result = {
|
||
"name": "",
|
||
"unread_count": 0,
|
||
"preview": "",
|
||
"timestamp": "",
|
||
}
|
||
|
||
if not lines:
|
||
return result
|
||
|
||
result["name"] = lines[0].strip()
|
||
|
||
if len(lines) >= 2:
|
||
# 检查第二行是否是 [N条] 格式
|
||
unread_match = re.match(r"\[(\d+)条\]", lines[1].strip())
|
||
if unread_match:
|
||
result["unread_count"] = int(unread_match.group(1))
|
||
if len(lines) >= 3:
|
||
result["preview"] = lines[2].strip()
|
||
if len(lines) >= 4:
|
||
result["timestamp"] = lines[-1].strip()
|
||
else:
|
||
result["preview"] = lines[1].strip()
|
||
if len(lines) >= 3:
|
||
result["timestamp"] = lines[-1].strip()
|
||
|
||
return result
|
||
|
||
@staticmethod
|
||
def _classify_message(title: str, size: tuple) -> str:
|
||
"""根据消息 title 和尺寸分类消息类型。
|
||
|
||
已知模式(微信 v4.1.9):
|
||
- 图片: title="图片"
|
||
- 视频: title 以 "视频" 开头
|
||
- 文件: title 以 "文件\\n" 开头
|
||
- 时间戳: 高度约 41px,内容为时间格式
|
||
- 其他: 文本消息或其他类型
|
||
"""
|
||
stripped = title.strip()
|
||
|
||
# 图片
|
||
if stripped == "图片":
|
||
return "image"
|
||
|
||
# 视频
|
||
if stripped.startswith("视频"):
|
||
return "video"
|
||
|
||
# 文件(多行格式:文件\n文件名\n大小\n微信电脑版)
|
||
if stripped.startswith("文件\n") or stripped.startswith("[文件]"):
|
||
return "file"
|
||
|
||
# 时间戳分隔符(高度约 41px,内容为时间格式)
|
||
if size[1] > 0 and size[1] <= 45:
|
||
time_pattern = re.match(
|
||
r"^(\d{1,2}:\d{2}|\d{1,2}/\d{1,2}|星期[一二三四五六日]|昨天|前天)$",
|
||
stripped,
|
||
)
|
||
if time_pattern:
|
||
return "timestamp"
|
||
|
||
return "text"
|