wechat_msg_clicker/wechat_clicker/automator.py
2026-04-23 18:39:49 +08:00

658 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""主自动化逻辑
编排整个工作流程:扫描未读聊天 → 点击进入 → 滚动加载历史 → 点击图片/文件 → 关闭 → 循环
图片处理流程:点击缩略图 → 打开大图 → 点击"..." → 点击"使用预览打开" → 关闭 Preview.app
文件处理流程:点击文件气泡触发下载
"""
import logging
import time
from .ax_bridge import AXBridge
from .config import Config
from .human_like import HumanBehavior
from .state_machine import StateMachine, UIState
from .wechat_ui import WeChatUI, MessageItem
logger = logging.getLogger("wechat_clicker.automator")
SCROLL_ROUNDS = 5
SCROLL_LINES_PER_ROUND = 10
class WeChatAutomator:
"""微信消息自动点击器"""
def __init__(self, config: Config, dry_run: bool = False):
self.config = config
self.dry_run = dry_run
self._single_mode = False
self.ax = AXBridge()
self.ui = WeChatUI(self.ax, config.bundle_id)
self.state = StateMachine(self.ax, self.ui)
self.human = HumanBehavior(config)
self._scan_count = 0
self._total_chats_processed = 0
self._total_media_clicked = 0
# ----------------------------------------------------------------
# 启动检查
# ----------------------------------------------------------------
def verify_setup(self) -> bool:
if not self.ax.check_accessibility():
logger.error("缺少辅助功能权限,无法继续")
return False
app_ref = self.ui.get_app_ref()
if app_ref is None:
logger.error("微信未运行,请先启动微信桌面端")
return False
main_win = self.ui.get_main_window()
if main_win is None:
logger.info("未找到微信主窗口,尝试恢复最小化窗口...")
if self.ui.ensure_wechat_visible():
main_win = self.ui.get_main_window()
if main_win is None:
logger.error("未找到微信主窗口,请确保微信已登录并可见")
return False
logger.info("环境检查通过")
return True
# ----------------------------------------------------------------
# 主循环
# ----------------------------------------------------------------
def run(self):
logger.info("微信自动点击器启动")
if not self.verify_setup():
return
while True:
try:
self._run_one_cycle()
except KeyboardInterrupt:
logger.info("用户中断,正在退出...")
break
except Exception as e:
logger.error(f"主循环异常: {e}", exc_info=True)
try:
self.state.recover_to_chat_list()
except Exception:
pass
time.sleep(10)
self._print_stats()
def run_once(self):
logger.info("执行单次扫描")
if not self.verify_setup():
return
self._single_mode = True
self._run_one_cycle()
self._print_stats()
def _run_one_cycle(self):
if not self._single_mode:
if self.human.is_off_hours():
logger.info("当前为非工作时间,等待中...")
time.sleep(300)
return
if self.human.should_take_break():
self.human.long_break()
return
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
if not self.state.recover_to_chat_list():
logger.warning("无法恢复到聊天列表,跳过本次循环")
time.sleep(10)
return
self._scan_count += 1
global_unread = self.ui.get_global_unread_count()
if global_unread == 0:
logger.debug(f"[扫描#{self._scan_count}] 没有未读消息")
sleep_time = self.human.scan_interval_with_jitter()
time.sleep(sleep_time)
return
logger.info(f"[扫描#{self._scan_count}] 全局未读: {global_unread}")
unread_chats = self.ui.get_unread_chats()
if not unread_chats:
logger.debug("聊天列表中未发现未读项(可能需要滚动)")
sleep_time = self.human.scan_interval_with_jitter()
time.sleep(sleep_time)
return
chats_to_process = [
c for c in unread_chats if self.config.should_process_chat(c.name)
]
count = self.human.random_subset_count(
len(chats_to_process), self.config.max_chats_per_scan
)
chats_to_process = chats_to_process[:count]
logger.info(
f"将处理 {len(chats_to_process)} 个聊天 "
f"(共 {len(unread_chats)} 个未读)"
)
for chat in chats_to_process:
self.human.delay("before_click_chat")
self._process_chat(chat)
self.human.delay("before_close_chat")
if not self._single_mode:
sleep_time = self.human.scan_interval_with_jitter()
logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描")
time.sleep(sleep_time)
# ----------------------------------------------------------------
# 处理单个聊天
# ----------------------------------------------------------------
def _process_chat(self, chat):
logger.info(f"处理聊天: {chat.name} (未读: {chat.unread_count})")
if self.dry_run:
logger.info(f" [DRY-RUN] 跳过点击: {chat.name}")
return
chat_pos = self.ax.get_position(chat.element)
chat_size = self.ax.get_size(chat.element)
logger.debug(f" 聊天项位置: pos={chat_pos} size={chat_size}")
if not self.ax.click_at_element(chat.element):
logger.warning(f" 点击聊天项失败: {chat.name}")
return
self.human.delay("after_open_chat")
# 单窗口模式:会话在主窗口内打开,直接在主窗口中查找消息列表
main_win = self.ui.get_main_window()
if main_win is None:
logger.warning(" 主窗口丢失")
return
msg_list = self.ui.get_message_list(main_win)
if msg_list is None:
logger.warning(f" 未找到消息列表,可能聊天未成功打开: {chat.name}")
return
msg_list_pos = self.ax.get_position(msg_list)
msg_list_size = self.ax.get_size(msg_list)
logger.debug(f" 消息列表: pos={msg_list_pos} size={msg_list_size}")
media_count = self._process_media_with_scroll(main_win, msg_list, chat.name)
self._total_media_clicked += media_count
self._total_chats_processed += 1
logger.info(f" {chat.name}: 本次处理了 {media_count} 个媒体")
# ----------------------------------------------------------------
# 带滚动的媒体处理
# ----------------------------------------------------------------
def _process_media_with_scroll(self, main_win, msg_list, chat_name: str) -> int:
"""先滚到底部看最新消息,再向上滚动加载历史并处理媒体。
用消息列表子元素索引做去重,避免同一张图片被重复点击。"""
total_clicked = 0
processed_indices = set()
# 先滚到消息列表底部(看到最新消息)
logger.info(f" {chat_name}: 滚动到最新消息...")
self.ax.scroll_to_bottom(msg_list, rounds=10, lines_per_round=-20)
self.human.random_delay(1.0, 2.0)
# 处理当前可见的媒体(最新消息)
clicked = self._process_visible_media(
main_win, msg_list, chat_name, processed_indices
)
total_clicked += clicked
# 向上滚动加载更多历史消息
for round_idx in range(SCROLL_ROUNDS):
# 安全网:滚动前确保没有残留的预览窗口
state = self.state.detect_state()
if state != UIState.MAIN_CHAT_LIST:
logger.debug(
f" 滚动前状态不干净: {state.value}, 清理额外窗口"
)
self._close_extra_windows()
self.human.delay("between_messages")
logger.debug(f" {chat_name}: 向上滚动第 {round_idx + 1}/{SCROLL_ROUNDS}")
self.ax.scroll_at_element(msg_list, lines=SCROLL_LINES_PER_ROUND)
self.human.random_delay(1.0, 2.5)
clicked = self._process_visible_media(
main_win, msg_list, chat_name, processed_indices
)
total_clicked += clicked
logger.debug(
f" {chat_name}: 滚动结束, 已处理索引={sorted(processed_indices)}"
)
return total_clicked
def _process_visible_media(
self, main_win, msg_list, chat_name: str, processed_indices: set
) -> int:
"""处理当前可见且未处理过的媒体消息。
通过子元素索引去重,通过可见区域过滤跳过裁剪元素。"""
# 获取消息列表可见区域
list_pos = self.ax.get_position(msg_list)
list_size = self.ax.get_size(msg_list)
visible_top = list_pos[1]
visible_bottom = list_pos[1] + list_size[1]
margin = 30
# 遍历消息列表的所有子元素,带索引
children = self.ax.get_children(msg_list)
targets = []
for idx, child in enumerate(children):
if idx in processed_indices:
continue
role = self.ax.get_role(child)
if role != "AXStaticText":
continue
title = self.ax.get_title(child)
if not title:
continue
size = self.ax.get_size(child)
if size[0] == 0 or size[1] == 0:
continue
msg_type = WeChatUI._classify_message(title, size)
if msg_type not in ("image", "file", "video"):
continue
# 检查配置是否处理该类型
if msg_type == "image" and not self.config.click_images:
continue
if msg_type == "file" and not self.config.click_files:
continue
if msg_type == "video" and not self.config.click_videos:
continue
# 可见性检查:元素中心必须在消息列表可见区域内(带 margin
pos = self.ax.get_position(child)
elem_center_y = pos[1] + size[1] // 2
if elem_center_y < visible_top + margin:
logger.debug(
f" 跳过(中心在顶部外): idx={idx} type={msg_type} "
f"center_y={elem_center_y} < list_top+margin={visible_top + margin}"
)
continue
if elem_center_y > visible_bottom - margin:
logger.debug(
f" 跳过(中心在底部外): idx={idx} type={msg_type} "
f"center_y={elem_center_y} > list_bottom-margin={visible_bottom - margin}"
)
continue
targets.append((idx, child, msg_type, title, size, pos))
if not targets:
logger.debug(f" {chat_name}: 当前视图无可处理的新媒体")
return 0
logger.info(
f" {chat_name}: 当前可见 {len(targets)} 个新媒体 "
f"(已处理 {len(processed_indices)} 个)"
)
clicked = 0
for idx, child, msg_type, title, size, pos in targets:
self.human.delay("before_click_media")
short_title = title.replace("\n", " ")[:50]
logger.info(
f" [{msg_type}] {short_title} "
f"(idx={idx}, pos={pos}, size={size})"
)
msg_item = MessageItem(
element=child, msg_type=msg_type, title=title, size=size
)
if msg_type == "image":
success = self._click_image(msg_item)
elif msg_type == "file":
success = self._click_file(msg_item)
else:
success = self._click_generic(msg_item)
# 无论成功失败都标记为已处理,避免重复尝试
processed_indices.add(idx)
if success:
clicked += 1
logger.debug(f" 处理成功: idx={idx} {msg_type}")
else:
logger.warning(f" 处理失败: idx={idx} {msg_type}")
self.human.delay("between_messages")
if self.ax.should_backoff():
logger.warning("连续错误过多,暂停处理")
self.ax.reset_error_count()
break
return clicked
# ----------------------------------------------------------------
# 图片处理:点击 → "..." → "使用预览打开" → 关闭 Preview
# ----------------------------------------------------------------
def _click_image(self, msg) -> bool:
"""处理一张图片:点开大图 -> 点... -> 点使用预览打开 -> 关闭 Preview.app。"""
# Step 1: 点击图片缩略图打开大图预览
# 图片气泡在消息行 AXStaticText 的左侧区域(非中心),
# 用偏移 (120, height//2) 命中实际缩略图
logger.debug(" Step1: 点击图片缩略图")
x_off = 120
y_off = msg.size[1] // 2
if not self.ax.click_at_element_offset(msg.element, x_off, y_off):
logger.warning(" 图片缩略图点击失败")
return False
self.human.delay("after_click_media")
# Step 2: 检查预览窗口状态
state = self.state.detect_state()
logger.debug(f" Step2: 点击后状态={state.value}")
if state != UIState.MEDIA_PREVIEW:
logger.debug(" 图片预览窗口未出现,尝试在内嵌预览中继续")
# Step 3: 查找 "..." 按钮
self.human.random_delay(0.5, 1.5)
logger.debug(" Step3: 搜索'...'按钮")
more_btn = self._find_more_button_in_preview()
if more_btn is None:
logger.warning(" 未找到'...'按钮Escape 退出")
self.ax.send_escape_key()
time.sleep(0.5)
return False
# Step 4: 点击 "..." 按钮
logger.debug(" Step4: 点击'...'按钮")
if not self.ax.click_at_element(more_btn):
if not self.ax.press(more_btn):
logger.warning(" '...'按钮点击失败")
self.ax.send_escape_key()
time.sleep(0.5)
return False
self.human.random_delay(0.5, 1.5)
# Step 5: 查找"使用预览打开"菜单项
logger.debug(" Step5: 搜索'使用预览打开'菜单项")
preview_item = self.ui.find_menu_item('使用"预览"打开')
if preview_item is None:
preview_item = self.ui.find_menu_item("预览")
if preview_item is None:
preview_item = self.ui.find_menu_item("Preview")
if preview_item is None:
logger.warning(" 未找到'使用预览打开'菜单项Escape 退出")
self.ax.send_escape_key()
time.sleep(0.3)
self.ax.send_escape_key()
time.sleep(0.5)
return False
# Step 6: 点击"使用预览打开"
logger.debug(" Step6: 点击'使用预览打开'")
if not self.ax.click_at_element(preview_item):
self.ax.press(preview_item)
self.human.random_delay(2.0, 4.0)
# Step 7: 关闭所有额外窗口Preview.app + 微信预览窗口)
logger.debug(" Step7: 清理预览窗口")
self._close_extra_windows()
# Step 8: Escape 关闭可能残留的内嵌预览
logger.debug(" Step8: Escape 关闭内嵌预览")
self.ax.send_escape_key()
time.sleep(0.5)
state = self.state.detect_state()
if state == UIState.MEDIA_PREVIEW:
logger.debug(" 仍在预览状态,再次 Escape")
self.ax.send_escape_key()
time.sleep(0.3)
return True
def _find_more_button_in_preview(self):
"""在图片预览区域查找"..."按钮(排除侧边栏区域)。"""
# 先在非主窗口(独立预览窗口)中找
windows = self.ui.get_all_windows()
for win in windows:
title = self.ax.get_title(win)
if title == "微信":
continue
btn = self.ui.find_preview_more_button(win)
if btn is not None:
pos = self.ax.get_position(btn)
logger.debug(f" 在独立窗口 '{title}' 中找到'...'按钮, pos={pos}")
return btn
# 内嵌预览只在主窗口的右侧区域找排除左侧侧边栏x > 200
main_win = self.ui.get_main_window()
if main_win:
btn = self.ui.find_preview_more_button(main_win, min_x=200)
if btn is not None:
pos = self.ax.get_position(btn)
logger.debug(f" 在主窗口右侧区域找到'...'按钮, pos={pos}")
return btn
logger.debug(" 未在任何窗口中找到'...'按钮")
return None
# ----------------------------------------------------------------
# 文件处理:直接点击触发下载
# ----------------------------------------------------------------
def _click_file(self, msg) -> bool:
"""点击文件触发下载,然后关闭可能弹出的预览窗口。"""
x_off = 120
y_off = msg.size[1] // 2
if not self.ax.click_at_element_offset(msg.element, x_off, y_off):
logger.warning(" 文件点击失败")
return False
self.human.delay("after_click_media")
self.human.random_delay(1.0, 2.0)
state = self.state.detect_state()
logger.debug(f" 文件点击后状态={state.value}")
if state != UIState.MAIN_CHAT_LIST:
self._close_extra_windows()
return True
# ----------------------------------------------------------------
# 通用处理(视频等)
# ----------------------------------------------------------------
def _click_generic(self, msg) -> bool:
x_off = 120
y_off = msg.size[1] // 2
if not self.ax.click_at_element_offset(msg.element, x_off, y_off):
return False
self.human.delay("after_click_media")
self.human.micro_jitter()
self.ax.send_escape_key()
time.sleep(0.5)
return True
# ----------------------------------------------------------------
# 窗口清理:关闭预览等额外窗口
# ----------------------------------------------------------------
def _close_extra_windows(self):
"""关闭所有非主窗口(文件预览、图片预览等)和 Preview.app恢复干净状态。"""
if self.state.is_preview_app_running():
logger.debug(" 关闭 Preview.app")
self.state.close_preview_app()
conv_wins = self.ui.get_conversation_windows()
for win in conv_wins:
title = self.ax.get_title(win)
logger.debug(f" 关闭额外窗口: {title}")
close_btn = self.ui.get_close_button(win)
if close_btn:
self.ax.press(close_btn)
else:
self.ax.send_cmd_w()
time.sleep(0.5)
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
# ----------------------------------------------------------------
# 调试工具
# ----------------------------------------------------------------
def dump_ui_tree(self, chat_name: str = None):
if not self.verify_setup():
return
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
if chat_name:
self._dump_chat_messages(chat_name)
return
main_win = self.ui.get_main_window()
if main_win:
print("=== 主窗口 (微信) ===")
print(self.ax.dump_element(main_win, max_depth=5))
conv_windows = self.ui.get_conversation_windows()
for win in conv_windows:
title = self.ax.get_title(win)
print(f"\n=== 会话窗口 ({title}) ===")
print(self.ax.dump_element(win, max_depth=5))
print("\n=== 聊天列表解析 ===")
items = self.ui.get_chat_items()
for item in items:
pos = self.ax.get_position(item.element)
size = self.ax.get_size(item.element)
status = f"[未读:{item.unread_count}]" if item.unread_count > 0 else ""
print(
f" {item.name} {status} | {item.preview} | {item.timestamp} "
f"| pos={pos} size={size}"
)
def _dump_chat_messages(self, chat_name: str):
"""进入指定聊天dump 消息列表中每个子元素的完整子树。"""
# 确保在聊天列表
if not self.state.recover_to_chat_list():
print(f"ERROR: 无法恢复到聊天列表")
return
# 在聊天列表中查找匹配的聊天项(子串匹配)
items = self.ui.get_chat_items()
target = None
for item in items:
if chat_name in item.name:
target = item
break
if target is None:
print(f"ERROR: 聊天列表中未找到包含 \"{chat_name}\" 的聊天")
print("当前可见聊天:")
for item in items:
print(f" - {item.name}")
return
print(f"找到聊天: {target.name} (未读: {target.unread_count})")
print(f"点击进入...")
# 点击聊天项(先点其他聊天再点回来,避免重复选中不加载)
other = next((i for i in items if i.name != target.name), None)
if other:
self.ax.click_at_element(other.element)
time.sleep(0.5)
if not self.ax.click_at_element(target.element):
print(f"ERROR: 点击聊天项失败")
return
time.sleep(2.0)
# 在主窗口中找消息列表(重试几次,等待 UI 加载)
main_win = self.ui.get_main_window()
if main_win is None:
print("ERROR: 主窗口丢失")
return
msg_list = None
for attempt in range(3):
msg_list = self.ui.get_message_list(main_win)
if msg_list is not None:
break
print(f" 等待消息列表加载... (尝试 {attempt + 1}/3)")
time.sleep(1.5)
if msg_list is None:
print("ERROR: 未找到消息列表dump 主窗口结构以诊断:")
print(self.ax.dump_element(main_win, max_depth=8))
return
# 滚到底部看最新消息
print("滚动到最新消息...")
self.ax.scroll_to_bottom(msg_list, rounds=10, lines_per_round=-20)
time.sleep(1.5)
# 遍历消息列表所有子元素
children = self.ax.get_children(msg_list)
print(f"\n=== 消息列表 ({target.name}) ===")
print(f"子元素总数: {len(children)}")
print()
for idx, child in enumerate(children):
role = self.ax.get_role(child)
title = self.ax.get_title(child)
size = self.ax.get_size(child)
pos = self.ax.get_position(child)
short_title = (title or "").replace("\n", "\\n")[:60]
print(f"--- [{idx}] role={role} title=\"{short_title}\" size={size} pos={pos} ---")
print(self.ax.dump_element(child, max_depth=10))
print()
# ----------------------------------------------------------------
# 统计
# ----------------------------------------------------------------
def _print_stats(self):
logger.info(
f"运行统计: 扫描={self._scan_count}, "
f"处理聊天={self._total_chats_processed}, "
f"点击媒体={self._total_media_clicked}"
)