wechat_msg_clicker/wechat_clicker/automator.py
2026-05-06 14:31:56 +08:00

875 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""主自动化逻辑
编排整个工作流程:扫描未读聊天 → 点击进入 → 点击图片/视频/文件 → 关闭 → 循环
图片+视频处理:锚点+箭头导航 — 点击最底部图片/视频进入预览,左箭头键依次切换
文件处理:直接点击文件气泡触发下载
"""
import logging
import subprocess
import time
from .ax_bridge import AXBridge
from .config import Config
from .human_like import HumanBehavior
from .state_machine import StateMachine, UIState
from .wechat_ui import WeChatUI, MessageItem
logger = logging.getLogger("wechat_clicker.automator")
IMAGE_OVERLAP = 5
MAX_IMAGES_PER_CHAT = 30
MAX_CONSECUTIVE_FAILURES = 3
MAX_REANCHOR_ATTEMPTS = 2
MAX_RECOVER_FAILURES = 15
class WeChatAutomator:
"""微信消息自动点击器"""
def __init__(self, config: Config, dry_run: bool = False):
self.config = config
self.dry_run = dry_run
self._single_mode = False
self.ax = AXBridge()
self.ui = WeChatUI(self.ax, config.bundle_id)
self.state = StateMachine(self.ax, self.ui)
self.human = HumanBehavior(config)
self._scan_count = 0
self._total_chats_processed = 0
self._total_media_clicked = 0
self._consecutive_recover_failures = 0
# ----------------------------------------------------------------
# 启动检查
# ----------------------------------------------------------------
def _send_alert(self, message: str):
"""发送 macOS 系统通知提醒用户介入。"""
try:
subprocess.run(
[
"osascript", "-e",
f'display notification "{message}" '
f'with title "微信自动点击器" sound name "Sosumi"'
],
capture_output=True, timeout=5,
)
logger.info(f"已发送系统通知: {message}")
except Exception as e:
logger.warning(f"发送通知失败: {e}")
def verify_setup(self) -> bool:
if not self.ax.check_accessibility():
logger.error("缺少辅助功能权限,无法继续")
return False
app_ref = self.ui.get_app_ref()
if app_ref is None:
logger.error("微信未运行,请先启动微信桌面端")
return False
main_win = self.ui.get_main_window()
if main_win is None:
logger.info("未找到微信主窗口,尝试恢复最小化窗口...")
if self.ui.ensure_wechat_visible():
main_win = self.ui.get_main_window()
if main_win is None:
logger.error("未找到微信主窗口,请确保微信已登录并可见")
return False
logger.info("环境检查通过")
return True
# ----------------------------------------------------------------
# 主循环
# ----------------------------------------------------------------
def run(self):
logger.info("微信自动点击器启动")
if not self.verify_setup():
return
while True:
try:
self._run_one_cycle()
except KeyboardInterrupt:
logger.info("用户中断,正在退出...")
break
except Exception as e:
logger.error(f"主循环异常: {e}", exc_info=True)
try:
self.state.recover_to_chat_list()
except Exception:
pass
time.sleep(10)
self._print_stats()
def run_once(self):
logger.info("执行单次扫描")
if not self.verify_setup():
return
self._single_mode = True
self._run_one_cycle()
self._print_stats()
def _run_one_cycle(self):
cycle_start = time.time()
if not self._single_mode:
if self.human.is_off_hours():
logger.info("当前为非工作时间,等待中...")
time.sleep(300)
return
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
if not self.state.recover_to_chat_list():
self._consecutive_recover_failures += 1
current_state = self.state.current_state
if self._consecutive_recover_failures >= MAX_RECOVER_FAILURES:
logger.critical(
f"微信恢复连续失败 {self._consecutive_recover_failures}"
f"(状态={current_state.value}), 需要人工介入! 暂停 600s"
)
self._send_alert(
f"连续恢复失败 {self._consecutive_recover_failures} 次, "
f"状态={current_state.value}"
)
time.sleep(600)
return
backoff = min(10 * (2 ** (self._consecutive_recover_failures - 1)), 120)
logger.warning(
f"无法恢复到聊天列表 (连续失败 {self._consecutive_recover_failures}, "
f"状态={current_state.value}), 等待 {backoff}s"
)
time.sleep(backoff)
return
self._consecutive_recover_failures = 0
self._scan_count += 1
global_unread = self.ui.get_global_unread_count()
if global_unread == 0:
logger.debug(f"[扫描#{self._scan_count}] 没有未读消息")
sleep_time = self.human.scan_interval_with_jitter()
time.sleep(sleep_time)
return
logger.info(f"[扫描#{self._scan_count}] 全局未读: {global_unread}")
unread_chats = self.ui.get_unread_chats()
if not unread_chats:
logger.debug("聊天列表中未发现未读项(可能需要滚动)")
sleep_time = self.human.scan_interval_with_jitter()
time.sleep(sleep_time)
return
all_candidates = [
c for c in unread_chats if self.config.should_process_chat(c.name)
]
filtered_out = [
c.name for c in unread_chats
if not self.config.should_process_chat(c.name)
]
if filtered_out:
logger.info(f" 过滤掉的聊天: {filtered_out}")
# 只在有实际要处理的聊天时才考虑长休息
if not self._single_mode and all_candidates:
if self.human.should_take_break(global_unread):
self.human.long_break()
return
count = self.human.random_subset_count(
len(all_candidates), self.config.max_chats_per_scan
)
chats_to_process = all_candidates[:count]
logger.info(
f"将处理 {len(chats_to_process)} 个聊天 "
f"(共 {len(unread_chats)} 个未读, 过滤 {len(filtered_out)} 个)"
)
cycle_media_count = 0
media_before = self._total_media_clicked
for chat in chats_to_process:
self.human.delay("before_click_chat")
self._process_chat(chat)
self.human.delay("before_close_chat")
cycle_media_count = self._total_media_clicked - media_before
cycle_duration = time.time() - cycle_start
logger.info(
f"[扫描#{self._scan_count}] 完成: "
f"全局未读={global_unread}, "
f"处理聊天={len(chats_to_process)}/{len(unread_chats)}, "
f"过滤={len(filtered_out)}, "
f"点击媒体={cycle_media_count}, "
f"耗时={cycle_duration:.0f}s"
)
if not self._single_mode:
if chats_to_process:
# 刚处理完聊天,可能还有更多未读,立即重扫
logger.debug("还有可能的未读消息,立即重新扫描")
return
sleep_time = self.human.scan_interval_with_jitter()
logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描")
time.sleep(sleep_time)
# ----------------------------------------------------------------
# 处理单个聊天
# ----------------------------------------------------------------
def _process_chat(self, chat):
logger.info(f"处理聊天: {chat.name} (未读: {chat.unread_count})")
if self.dry_run:
logger.info(f" [DRY-RUN] 跳过点击: {chat.name}")
return
chat_pos = self.ax.get_position(chat.element)
chat_size = self.ax.get_size(chat.element)
logger.debug(f" 聊天项位置: pos={chat_pos} size={chat_size}")
if not self.ax.click_at_element(chat.element):
logger.warning(f" 点击聊天项失败: {chat.name}")
return
self.human.delay("after_open_chat")
# 单窗口模式:会话在主窗口内打开,直接在主窗口中查找消息列表
main_win = self.ui.get_main_window()
if main_win is None:
logger.warning(" 主窗口丢失")
return
msg_list = self.ui.get_message_list(main_win)
if msg_list is None:
logger.warning(f" 未找到消息列表,可能聊天未成功打开: {chat.name}")
return
msg_list_pos = self.ax.get_position(msg_list)
msg_list_size = self.ax.get_size(msg_list)
logger.debug(f" 消息列表: pos={msg_list_pos} size={msg_list_size}")
media_count = self._process_media_with_scroll(main_win, msg_list, chat.name, chat.unread_count)
self._total_media_clicked += media_count
self._total_chats_processed += 1
logger.info(f" {chat.name}: 本次处理了 {media_count} 个媒体")
# ----------------------------------------------------------------
# 带滚动的媒体处理
# ----------------------------------------------------------------
def _process_media_with_scroll(self, main_win, msg_list, chat_name: str, unread_count: int = 0) -> int:
"""处理聊天中的图片、视频和文件。
图片+视频:锚点+箭头导航(内部管理滚动和锚点查找)。
文件:直接点击可见的文件元素。"""
total_clicked = 0
# 图片+视频处理:锚点+箭头导航(滚动和锚点查找在内部完成)
if self.config.click_images or self.config.click_videos:
media_count = min(max(5, unread_count) + IMAGE_OVERLAP, MAX_IMAGES_PER_CHAT)
media_clicked = self._process_media_with_arrow(msg_list, chat_name, media_count)
total_clicked += media_clicked
# 文件处理:直接点击可见元素
if self.config.click_files:
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
state = self.state.detect_state()
if state != UIState.MAIN_CHAT_LIST:
self._close_extra_windows()
file_clicked = self._process_visible_files(main_win, msg_list, chat_name)
total_clicked += file_clicked
return total_clicked
# ----------------------------------------------------------------
# 图片处理:锚点+箭头导航
# ----------------------------------------------------------------
def _find_anchor_media(self, msg_list):
"""在消息列表底部找到最后一个可见的图片或视频元素(作为导航锚点)。
返回 (element, position, size) 或 None。"""
list_pos = self.ax.get_position(msg_list)
list_size = self.ax.get_size(msg_list)
visible_top = list_pos[1]
visible_bottom = list_pos[1] + list_size[1]
margin = 30
children = self.ax.get_children(msg_list)
anchor = None
for child in children:
if self.ax.get_role(child) != "AXStaticText":
continue
title = self.ax.get_title(child)
if title != "图片" and not (title and title.startswith("视频")):
continue
size = self.ax.get_size(child)
if size[0] == 0 or size[1] == 0:
continue
pos = self.ax.get_position(child)
center_y = pos[1] + size[1] // 2
if visible_top + margin <= center_y <= visible_bottom - margin:
anchor = (child, pos, size)
return anchor
def _process_media_with_arrow(self, msg_list, chat_name: str, media_count: int) -> int:
"""用锚点+箭头导航处理多张图片和视频。
内部管理锚点查找、滚动和失败重锚。"""
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
logger.debug(f" {chat_name}: 未找到可见图片/视频,跳过媒体处理")
return 0
logger.info(f" {chat_name}: 将处理 {media_count} 个图片/视频")
clicked = 0
consecutive_failures = 0
reanchor_count = 0
i = 0
while i < media_count:
logger.debug(f" [{i+1}/{media_count}] 点击锚点媒体")
if not self.ax.click_at_element_offset(anchor_element, 120, anchor_size[1] // 2):
logger.warning(f" [{i+1}] 锚点点击失败")
if reanchor_count >= MAX_REANCHOR_ATTEMPTS:
logger.warning(f" 重锚次数已达上限({MAX_REANCHOR_ATTEMPTS}),中断")
break
logger.info(f" 尝试重锚 ({reanchor_count+1}/{MAX_REANCHOR_ATTEMPTS})...")
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
logger.warning(f" 重锚失败,未找到媒体")
break
reanchor_count += 1
i = 0
consecutive_failures = 0
continue
time.sleep(2.0)
# 检测预览窗口是否打开(视频首次点击可能需要更长时间加载)
windows = self.ui.get_all_windows()
if len(windows) < 2:
# 重试一次再等3秒视频首次点击会触发下载预览窗口延迟弹出
logger.debug(f" [{i+1}] 预览窗口未立即打开再等3秒...")
time.sleep(3.0)
windows = self.ui.get_all_windows()
if len(windows) < 2:
# 兜底:即使没弹出独立预览窗口,也检查是否有"查看原视频"按钮(内嵌预览)
video_btn = self._find_view_original_video_button()
if video_btn:
logger.info(f" [{i+1}/{media_count}] 内嵌预览检测到视频,处理 (箭头×{i})")
success = self._do_video_download_and_close(video_btn)
if success:
clicked += 1
consecutive_failures = 0
else:
consecutive_failures += 1
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
self.human.delay("between_messages")
i += 1
continue
consecutive_failures += 1
logger.warning(f" [{i+1}] 预览窗口未打开 (窗口数={len(windows)}, 连续失败 {consecutive_failures})")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
if reanchor_count >= MAX_REANCHOR_ATTEMPTS:
logger.warning(f" 重锚次数已达上限,中断")
break
logger.info(f" 连续 {consecutive_failures} 次失败,重锚 ({reanchor_count+1}/{MAX_REANCHOR_ATTEMPTS})...")
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
break
reanchor_count += 1
i = 0
consecutive_failures = 0
continue
logger.debug(f" [{i+1}] 预览已打开 (窗口数={len(windows)})")
if i > 0:
logger.debug(f" [{i+1}] 按左箭头 {i}")
for _ in range(i):
self.ax.send_left_arrow()
time.sleep(0.5)
time.sleep(0.5)
# 检测当前预览类型并处理:
# 1. 有"查看原视频" → 未下载视频 → 点击触发下载
# 2. 无"查看原视频"但有"原视频" → 已下载视频 → 直接退出
# 3. 都没有 → 图片 → "..."→"使用预览打开"
video_btn = self._find_view_original_video_button()
is_downloaded_video = False if video_btn else self._is_downloaded_video_preview()
logger.debug(f" [{i+1}] 类型检测: 查看原视频={video_btn is not None}, 已下载视频={is_downloaded_video}")
if video_btn:
logger.info(f" [{i+1}/{media_count}] 处理视频 (箭头×{i})")
success = self._do_video_download_and_close(video_btn)
elif is_downloaded_video:
logger.debug(f" [{i+1}/{media_count}] 已下载视频,跳过 (箭头×{i})")
time.sleep(1.0)
self.ax.send_escape_key()
time.sleep(0.5)
success = True
else:
logger.info(f" [{i+1}/{media_count}] 处理图片 (箭头×{i})")
success = self._do_preview_and_close()
if success:
clicked += 1
consecutive_failures = 0
else:
consecutive_failures += 1
logger.warning(f" [{i+1}] 处理失败 (连续失败 {consecutive_failures})")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
if reanchor_count >= MAX_REANCHOR_ATTEMPTS:
logger.warning(f" 重锚次数已达上限,中断")
break
logger.info(f" 连续 {consecutive_failures} 次失败,重锚 ({reanchor_count+1}/{MAX_REANCHOR_ATTEMPTS})...")
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
break
reanchor_count += 1
i = 0
consecutive_failures = 0
continue
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
self.human.delay("between_messages")
i += 1
logger.info(f" {chat_name}: 箭头导航完成, 成功 {clicked}/{media_count}")
return clicked
def _scroll_and_find_anchor(self, msg_list):
"""滚到底部并找锚点图片,返回 (element, size) 或 (None, None)。"""
self.ax.scroll_to_bottom(msg_list, rounds=5, lines_per_round=-20)
time.sleep(1.0)
anchor = self._find_anchor_media(msg_list)
if anchor:
pos = self.ax.get_position(anchor[0])
logger.debug(f" 锚点图片: pos={pos} size={anchor[2]}")
return anchor[0], anchor[2]
return None, None
def _do_preview_and_close(self) -> bool:
"""在已打开的图片预览中执行: 找"..."→ 点击 → 找"使用预览打开"→ 点击 → 关闭 Preview.app。"""
self.human.random_delay(0.5, 1.0)
# 找 "..." 按钮
more_btn = self._find_more_button_in_preview()
if more_btn is None:
logger.warning(" 未找到'...'按钮Escape 退出")
self.ax.send_escape_key()
time.sleep(0.5)
return False
if not self.ax.click_at_element(more_btn):
if not self.ax.press(more_btn):
logger.warning(" '...'按钮点击失败")
self.ax.send_escape_key()
time.sleep(0.5)
return False
self.human.random_delay(0.5, 1.0)
# 找"使用预览打开"菜单项
preview_item = self.ui.find_menu_item('使用"预览"打开')
if preview_item is None:
preview_item = self.ui.find_menu_item("预览")
if preview_item is None:
preview_item = self.ui.find_menu_item("Preview")
if preview_item is None:
logger.warning(" 未找到'使用预览打开'菜单项")
self.ax.send_escape_key()
time.sleep(0.3)
self.ax.send_escape_key()
time.sleep(0.5)
return False
if not self.ax.click_at_element(preview_item):
self.ax.press(preview_item)
self.human.random_delay(2.0, 3.5)
# 关闭 Preview.app + 额外窗口
self._close_extra_windows()
# Escape 关闭残留的内嵌预览
self.ax.send_escape_key()
time.sleep(0.3)
state = self.state.detect_state()
if state == UIState.MEDIA_PREVIEW:
self.ax.send_escape_key()
time.sleep(0.3)
return True
# ----------------------------------------------------------------
# 视频处理:查找"查看原视频"按钮并触发下载
# ----------------------------------------------------------------
def _find_view_original_video_button(self):
"""在视频预览区域查找"查看原视频"按钮(仅未下载过的视频有此按钮)。"""
windows = self.ui.get_all_windows()
for win in windows:
title = self.ax.get_title(win)
if title == "微信":
continue
btn = self.ui._find_element_with_text(win, "查看原视频", max_depth=6)
if btn is not None:
pos = self.ax.get_position(btn)
logger.debug(f" 在独立窗口 '{title}' 中找到'查看原视频'按钮, pos={pos}")
return btn
main_win = self.ui.get_main_window()
if main_win:
btn = self.ui._find_element_with_text(main_win, "查看原视频", max_depth=6)
if btn is not None:
pos = self.ax.get_position(btn)
if pos[0] > 200:
logger.debug(f" 在主窗口右侧找到'查看原视频'按钮, pos={pos}")
return btn
return None
def _is_downloaded_video_preview(self) -> bool:
"""检查当前预览是否为已下载的视频(有"原视频"但无"查看原视频")。"""
windows = self.ui.get_all_windows()
for win in windows:
title = self.ax.get_title(win)
if title == "微信":
continue
btn = self.ui._find_element_with_text(win, "原视频", max_depth=6)
if btn is not None:
logger.debug(f" 在独立窗口 '{title}' 中找到'原视频'按钮(已下载)")
return True
main_win = self.ui.get_main_window()
if main_win:
btn = self.ui._find_element_with_text(main_win, "原视频", max_depth=6)
if btn is not None:
pos = self.ax.get_position(btn)
if pos[0] > 200:
logger.debug(f" 在主窗口右侧找到'原视频'按钮(已下载)")
return True
return False
def _do_video_download_and_close(self, video_btn) -> bool:
"""在已打开的视频预览中执行: 点击"查看原视频"→ 等待下载 → 关闭预览。"""
self.human.random_delay(0.5, 1.0)
if not self.ax.click_at_element(video_btn):
if not self.ax.press(video_btn):
logger.warning(" '查看原视频'按钮点击失败")
self.ax.send_escape_key()
time.sleep(0.5)
return False
logger.debug(" 等待视频下载 (10s)...")
time.sleep(10)
self._close_extra_windows()
self.ax.send_escape_key()
time.sleep(0.3)
state = self.state.detect_state()
if state == UIState.MEDIA_PREVIEW:
self.ax.send_escape_key()
time.sleep(0.3)
return True
# ----------------------------------------------------------------
# 文件/视频处理:直接点击可见元素
# ----------------------------------------------------------------
def _process_visible_files(self, main_win, msg_list, chat_name: str) -> int:
"""处理当前可见的文件(图片和视频走箭头导航,不在此处理)。"""
list_pos = self.ax.get_position(msg_list)
list_size = self.ax.get_size(msg_list)
visible_top = list_pos[1]
visible_bottom = list_pos[1] + list_size[1]
margin = 30
children = self.ax.get_children(msg_list)
targets = []
for child in children:
if self.ax.get_role(child) != "AXStaticText":
continue
title = self.ax.get_title(child)
if not title:
continue
size = self.ax.get_size(child)
if size[0] == 0 or size[1] == 0:
continue
msg_type = WeChatUI._classify_message(title, size)
if msg_type != "file":
continue
pos = self.ax.get_position(child)
center_y = pos[1] + size[1] // 2
if center_y < visible_top + margin or center_y > visible_bottom - margin:
continue
targets.append((child, msg_type, title, size))
if not targets:
return 0
logger.info(f" {chat_name}: 发现 {len(targets)} 个文件")
clicked = 0
for child, msg_type, title, size in targets:
self.human.delay("before_click_media")
short_title = title.replace("\n", " ")[:50]
logger.info(f" [file] {short_title}")
msg_item = MessageItem(element=child, msg_type=msg_type, title=title, size=size)
success = self._click_file(msg_item)
if success:
clicked += 1
self.human.delay("between_messages")
return clicked
def _find_more_button_in_preview(self):
"""在图片预览区域查找"..."按钮(排除侧边栏区域)。"""
# 先在非主窗口(独立预览窗口)中找
windows = self.ui.get_all_windows()
for win in windows:
title = self.ax.get_title(win)
if title == "微信":
continue
btn = self.ui.find_preview_more_button(win)
if btn is not None:
pos = self.ax.get_position(btn)
logger.debug(f" 在独立窗口 '{title}' 中找到'...'按钮, pos={pos}")
return btn
# 内嵌预览只在主窗口的右侧区域找排除左侧侧边栏x > 200
main_win = self.ui.get_main_window()
if main_win:
btn = self.ui.find_preview_more_button(main_win, min_x=200)
if btn is not None:
pos = self.ax.get_position(btn)
logger.debug(f" 在主窗口右侧区域找到'...'按钮, pos={pos}")
return btn
logger.debug(" 未在任何窗口中找到'...'按钮")
return None
# ----------------------------------------------------------------
# 文件处理:直接点击触发下载
# ----------------------------------------------------------------
def _click_file(self, msg) -> bool:
"""点击文件触发下载,然后关闭可能弹出的预览窗口。"""
x_off = 120
y_off = msg.size[1] // 2
if not self.ax.click_at_element_offset(msg.element, x_off, y_off):
logger.warning(" 文件点击失败")
return False
self.human.delay("after_click_media")
self.human.random_delay(1.0, 2.0)
state = self.state.detect_state()
logger.debug(f" 文件点击后状态={state.value}")
if state != UIState.MAIN_CHAT_LIST:
self._close_extra_windows()
return True
# ----------------------------------------------------------------
# 通用处理(视频等)
# ----------------------------------------------------------------
def _click_generic(self, msg) -> bool:
x_off = 120
y_off = msg.size[1] // 2
if not self.ax.click_at_element_offset(msg.element, x_off, y_off):
return False
self.human.delay("after_click_media")
self.human.micro_jitter()
self.ax.send_escape_key()
time.sleep(0.5)
return True
# ----------------------------------------------------------------
# 窗口清理:关闭预览等额外窗口
# ----------------------------------------------------------------
def _close_extra_windows(self):
"""关闭所有非主窗口(文件预览、图片预览等)和 Preview.app恢复干净状态。"""
if self.state.is_preview_app_running():
logger.debug(" 关闭 Preview.app")
self.state.close_preview_app()
conv_wins = self.ui.get_conversation_windows()
for win in conv_wins:
title = self.ax.get_title(win)
logger.debug(f" 关闭额外窗口: {title}")
close_btn = self.ui.get_close_button(win)
if close_btn:
self.ax.press(close_btn)
else:
self.ax.send_cmd_w()
time.sleep(0.5)
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
# ----------------------------------------------------------------
# 调试工具
# ----------------------------------------------------------------
def dump_ui_tree(self, chat_name: str = None):
if not self.verify_setup():
return
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
if chat_name:
self._dump_chat_messages(chat_name)
return
main_win = self.ui.get_main_window()
if main_win:
print("=== 主窗口 (微信) ===")
print(self.ax.dump_element(main_win, max_depth=5))
conv_windows = self.ui.get_conversation_windows()
for win in conv_windows:
title = self.ax.get_title(win)
print(f"\n=== 会话窗口 ({title}) ===")
print(self.ax.dump_element(win, max_depth=5))
print("\n=== 聊天列表解析 ===")
items = self.ui.get_chat_items()
for item in items:
pos = self.ax.get_position(item.element)
size = self.ax.get_size(item.element)
status = f"[未读:{item.unread_count}]" if item.unread_count > 0 else ""
print(
f" {item.name} {status} | {item.preview} | {item.timestamp} "
f"| pos={pos} size={size}"
)
def _dump_chat_messages(self, chat_name: str):
"""进入指定聊天dump 消息列表中每个子元素的完整子树。"""
# 确保在聊天列表
if not self.state.recover_to_chat_list():
print(f"ERROR: 无法恢复到聊天列表")
return
# 在聊天列表中查找匹配的聊天项(子串匹配)
items = self.ui.get_chat_items()
target = None
for item in items:
if chat_name in item.name:
target = item
break
if target is None:
print(f"ERROR: 聊天列表中未找到包含 \"{chat_name}\" 的聊天")
print("当前可见聊天:")
for item in items:
print(f" - {item.name}")
return
print(f"找到聊天: {target.name} (未读: {target.unread_count})")
print(f"点击进入...")
# 点击聊天项(先点其他聊天再点回来,避免重复选中不加载)
other = next((i for i in items if i.name != target.name), None)
if other:
self.ax.click_at_element(other.element)
time.sleep(0.5)
if not self.ax.click_at_element(target.element):
print(f"ERROR: 点击聊天项失败")
return
time.sleep(2.0)
# 在主窗口中找消息列表(重试几次,等待 UI 加载)
main_win = self.ui.get_main_window()
if main_win is None:
print("ERROR: 主窗口丢失")
return
msg_list = None
for attempt in range(3):
msg_list = self.ui.get_message_list(main_win)
if msg_list is not None:
break
print(f" 等待消息列表加载... (尝试 {attempt + 1}/3)")
time.sleep(1.5)
if msg_list is None:
print("ERROR: 未找到消息列表dump 主窗口结构以诊断:")
print(self.ax.dump_element(main_win, max_depth=8))
return
# 滚到底部看最新消息
print("滚动到最新消息...")
self.ax.scroll_to_bottom(msg_list, rounds=10, lines_per_round=-20)
time.sleep(1.5)
# 遍历消息列表所有子元素
children = self.ax.get_children(msg_list)
print(f"\n=== 消息列表 ({target.name}) ===")
print(f"子元素总数: {len(children)}")
print()
for idx, child in enumerate(children):
role = self.ax.get_role(child)
title = self.ax.get_title(child)
size = self.ax.get_size(child)
pos = self.ax.get_position(child)
short_title = (title or "").replace("\n", "\\n")[:60]
print(f"--- [{idx}] role={role} title=\"{short_title}\" size={size} pos={pos} ---")
print(self.ax.dump_element(child, max_depth=10))
print()
# ----------------------------------------------------------------
# 统计
# ----------------------------------------------------------------
def _print_stats(self):
logger.info(
f"运行统计: 扫描={self._scan_count}, "
f"处理聊天={self._total_chats_processed}, "
f"点击媒体={self._total_media_clicked}"
)