Compare commits

..

2 Commits

Author SHA1 Message Date
1f3cbaa0f9 优化主窗口寻找 2026-04-24 17:27:16 +08:00
1468b26436 优化为 图片+箭头 移动方案 2026-04-24 17:08:00 +08:00
8 changed files with 342 additions and 206 deletions

View File

@ -20,7 +20,7 @@ wechat_clicker/
├── ax_bridge.py # AXUIElement 底层封装(属性读取、鼠标点击、键盘事件、滚动)
├── wechat_ui.py # 微信 UI 导航(聊天列表、消息列表、预览界面元素查找)
├── state_machine.py # UI 状态机窗口状态检测、恢复、Preview.app 管理)
├── automator.py # 主自动化逻辑(扫描→进入聊天→滚动→点击图片/文件→循环)
├── automator.py # 主自动化逻辑(扫描→进入聊天→锚点+箭头导航点击图片→点击文件→循环)
├── human_like.py # 拟人行为(高斯分布延迟、长休息、工作时间)
├── config.py # YAML 配置加载
└── logger_setup.py # 日志配置
@ -32,16 +32,17 @@ wechat_clicker/
- AXValue 位置/尺寸需用 AXValueGetValue 解包 CGPoint/CGSize
- 消息类型通过 title 内容判断:`"图片"` → 图片,`"文件\n..."` → 文件
- **图片/文件气泡偏移点击**AXStaticText 覆盖整行575px宽实际气泡仅在左侧 ~80-170px 区域,使用 `click_at_element_offset(x=120, y=height/2)` 命中
- 图片处理:点击缩略图 → 点击"..." → 点击"使用预览打开" → 关闭 Preview.app
- 文件处理:直接点击触发下载
- **图片处理(锚点+箭头导航)**:滚到底部 → 找最底部可见图片(锚点)→ 点击锚点进入预览 → 按 i 次左箭头到达第 i 张 → 点"..."→"使用预览打开"→ 关闭 Preview.app → 回到微信 → 再次点击锚点 → 左箭头 i+1 次 → ... 循环直到覆盖 `unread_count + overlap` 张。彻底绕过滚动覆盖、元素可见性、去重等问题
- **锚点稳定性**:连续 3 次失败自动重锚(重新滚到底部 + 重找锚点 + i 归零),最多重锚 2 次;每聊天最多处理 30 张图片(`MAX_IMAGES_PER_CHAT`),防止单聊天垄断处理时间
- 文件处理:直接点击触发下载(不走箭头导航)
- **滚动方向**macOS CGEvent ScrollWheel **负值=向下滚**(看新消息),**正值=向上滚**(看旧消息)
- 进入聊天后先滚到底部(负值),再向上滚动 5 轮(正值)加载历史消息
- 进入聊天后滚到底部(负值),图片通过箭头导航覆盖历史
- 滚动使用 **kCGEventMouseMoved + ScrollWheel**(不触发点击),避免误点 UI 元素
- "..."按钮搜索限制在预览区域(独立窗口或主窗口 x>200排除侧边栏
- 媒体去重基于消息列表**子元素索引**child index同一元素跨滚动轮次只处理一次
- 可见性检查基于元素**中心点**是否在消息列表可见区域内30px margin
- 状态检测基于窗口计数 + **AXMinimized 属性**:区分窗口存在但最小化 vs 真正可见
- **窗口恢复策略**NSRunningApplication.unhide隐藏→ AX API 设置 AXMinimized=False最小化→ activate前台AppleScript 不可靠(微信不支持 miniaturized 属性)
- **UNKNOWN 状态恢复**:先 Escape + Cmd+W 关闭可能的弹窗/多余窗口,再 activate + 点击侧边栏外层循环连续恢复失败时指数退避10s → 20s → 40s → ... → 120s 封顶),避免无效高频重试
## 使用方法
@ -65,8 +66,11 @@ python main.py --debug # 详细日志
## 配置重点
- `config.yaml` 中可设置扫描间隔、延迟范围、白/黑名单、工作时间、媒体类型开关
- `max_chats_per_scan: 0` 表示不限制,处理全部未读聊天(适合大量群聊场景)
- 默认只点击图片(`media.click_files: false`, `media.click_videos: false`
- 默认黑名单包含微信系统账号
- 处理完有未读的聊天后立即重扫,不等待 scan interval只有无未读时才 sleep
- 忙时(未读 > 5自动跳过随机休息
## 注意事项

View File

@ -7,19 +7,19 @@ wechat:
# 扫描行为
scan:
interval_seconds: 30 # 每次扫描间隔(秒)
max_chats_per_scan: 5 # 每次扫描最多处理几个聊天
interval_seconds: 15 # 每次扫描间隔(秒),无未读消息时生效
max_chats_per_scan: 0 # 每次扫描最多处理几个聊天0=不限制,处理全部)
scroll_chat_list: false # 是否滚动聊天列表查找更多聊天
# 延迟范围(秒),模拟真人操作节奏
delays:
before_click_chat: [2, 5] # 点击聊天前
after_open_chat: [1, 3] # 打开会话后等待
before_click_media: [1, 4] # 点击图片/文件前
after_click_media: [3, 8] # 点击图片/文件后(等待下载)
before_close_preview: [1, 3] # 关闭预览前
before_close_chat: [1, 2] # 关闭会话窗口前
between_messages: [0.5, 2] # 处理每条消息之间
before_click_chat: [0.5, 2] # 点击聊天前
after_open_chat: [0.5, 1.5] # 打开会话后等待
before_click_media: [0.3, 1.5] # 点击图片/文件前
after_click_media: [1.5, 4] # 点击图片/文件后(等待下载)
before_close_preview: [0.3, 1] # 关闭预览前
before_close_chat: [0.3, 1] # 关闭会话窗口前
between_messages: [0.2, 0.8] # 处理每条消息之间
# 工作时间24小时制
schedule:

View File

@ -66,6 +66,42 @@
- [x] **配置变更: click_files 默认关闭**`media.click_files` 默认值从 `true` 改为 `false`,默认只点击图片不点击文件。需要点击文件时在 config.yaml 中设置 `click_files: true`
### v0.8.0 提高图片可靠性 — 减少遗漏 (2026/04/24)
- [x] **修复: 去重 bug** — 原用绝对子元素索引去重,滚动后索引偏移导致图片被错误跳过。改用反向索引(距数组末尾距离),滚动时新元素从头部插入不影响已有元素
- [x] **新增: 失败重试** — 图片点击失败不再永久跳过,最多重试 2 次后才标记为已处理
- [x] **改进: 移除聊天数量上限**`max_chats_per_scan` 默认改为 0不限制几十个群全部处理
- [x] **改进: 忙时不 sleep** — 处理完有未读聊天后立即重扫,不等待 scan interval只有无未读时才 sleep 15s
- [x] **改进: 动态滚动轮数** — 根据聊天未读数动态计算滚动轮数(`max(5, unread//3)`,上限 30大群覆盖更深
- [x] **新增: 尾部复检** — 滚动处理完成后回到底部复检,捕获处理期间新到达的消息
- [x] **改进: 忙时不休息** — 当全局未读 > 5 时跳过随机休息检查
- [x] **改进: 缩短延迟范围** — 所有延迟缩短约 50-60%,提高处理效率
- [x] **增强: 日志** — 记录被过滤掉的聊天名、每周期输出摘要(全局未读/处理聊天数/过滤数/点击媒体数/耗时)
### v0.9.0 锚点+箭头导航重构图片处理 (2026/04/24)
- [x] **重构: 图片处理改为锚点+箭头导航** — 不再逐个查找图片元素、滚动、去重。改为:找最底部可见图片(锚点)→ 点击进入预览 → 左箭头 i 次到达第 i 张 → "..."→"使用预览打开"→ 关闭 Preview → 重复。彻底绕过滚动覆盖不足、可见性判断、去重 bug 等问题
- [x] **新增: ax_bridge 左箭头键支持**`kVK_LeftArrow` 常量和 `send_left_arrow()` 方法
- [x] **新增: `_find_anchor_image`** — 在消息列表底部查找最后一个可见图片元素作为导航锚点
- [x] **新增: `_process_images_with_arrow`** — 锚点+箭头循环核心逻辑,每张图片:点击锚点 → 左箭头 N 次 → 预览流程
- [x] **新增: `_do_preview_and_close`** — 从旧 `_click_image` 提取的预览流程(找"..."→ "使用预览打开"→ 关闭 Preview
- [x] **新增: `_process_visible_files`** — 文件/视频独立处理(不走箭头导航,直接点击可见元素)
- [x] **删除: 旧图片逐个处理逻辑**`_click_image`、`_process_visible_media` 中的图片分支、`processed_indices`/`retry_counts` 去重、滚动上翻 N 轮、尾部复检
- [x] **已验证: 锚点+箭头导航** — 测试脚本 `test_arrow_nav.py` 验证 5/5 张图片全部成功,平均 10.4s/张
### v0.9.1 锚点+箭头导航稳定性增强 (2026/04/24)
- [x] **新增: 重锚机制Re-anchor** — 连续 3 次失败(预览未打开或预览流程失败)自动触发重锚:滚到底部 → 重新找锚点 → i 归零重新开始。最多重锚 2 次,超过则放弃当前聊天
- [x] **新增: `_scroll_and_find_anchor`** — 合并滚动+锚点查找为单一方法,供初始化和重锚复用
- [x] **改进: `_process_images_with_arrow` 重写** — 改为接收 `msg_list` 参数,内部管理锚点生命周期(查找、使用、失效检测、重锚)
- [x] **新增: 每聊天图片处理上限**`MAX_IMAGES_PER_CHAT = 30`防止单个高活跃群聊垄断处理时间30张×10s = 5分钟/聊天)
- [x] **简化: `_process_media_with_scroll`** — 不再负责滚动和锚点查找(移入 `_process_images_with_arrow` 内部),代码更清晰
### v0.9.2 UNKNOWN 状态恢复加固 (2026/04/24)
- [x] **修复: UNKNOWN 状态长时间无法恢复** — 原恢复逻辑只做 activate + 点侧边栏,遇到弹窗/多余窗口无法清理。改为先 Escape + Cmd+W 关闭可能的弹窗,再 activate + 点侧边栏
- [x] **改进: 恢复失败指数退避** — 外层循环连续恢复失败时退避等待10s → 20s → 40s → 80s → 120s封顶避免每 13 秒重复无效尝试。恢复成功后计数器归零
### 待验证
- [x] ~~验证最小化窗口自动恢复功能(需手动最小化微信窗口测试)~~
@ -98,22 +134,30 @@
- 计算元素中心坐标 = position + size/2
- 图片预览中的按钮优先尝试 click_at_element后备 AXPress
### 图片下载流程
### 图片下载流程(锚点+箭头导航)
1. 点击图片缩略图 → 打开大图预览(微信内窗口)
2. 在预览窗口找到 "..." (更多) 按钮并点击
3. 在弹出菜单中找到 "使用'预览'打开" 并点击
4. 等待 macOS Preview.app 打开(此时原始文件已保存到本地)
5. 关闭 Preview.app 窗口
6. 回到微信,关闭大图预览
1. 滚到消息列表底部(最新消息)
2. 找到最底部可见的图片元素(锚点图片 N
3. 对 i = 0, 1, 2, ... image_count-1:
a. 点击锚点图片 N → 打开预览
b. 按左箭头 i 次 → 到达图片 N-i
c. 点击 "..." (更多) 按钮
d. 点击 "使用'预览'打开"
e. 等待 macOS Preview.app 打开(原始文件已保存到本地)
f. 关闭 Preview.app + 额外窗口
g. 回到微信,准备下一轮
4. image_count = min(max(5, unread_count) + 5, 30)(固定 overlap上限 30 张/聊天)
5. 连续 3 次失败自动重锚(滚到底部 + 重找锚点 + i 归零),最多重锚 2 次
### 防封策略
### 调度策略
- 高斯分布随机延迟(非均匀)
- ±20% 扫描间隔抖动
- 5% 概率长休息30-120 秒)
- 工作时间限制
- 每次最多 5 个聊天
- ±20% 扫描间隔抖动(无未读时生效)
- 忙时(未读 > 5跳过随机休息空闲时 5% 概率长休息
- 工作时间限制8:00-23:00
- 聊天数量不限制(`max_chats_per_scan: 0`
- 处理完有未读聊天后立即重扫
- 图片覆盖量 = min(max(5, unread_count) + 5, 30) (固定 overlap上限 30 张/聊天)
### 依赖

View File

@ -1,9 +1,9 @@
"""主自动化逻辑
编排整个工作流程扫描未读聊天 点击进入 滚动加载历史 点击图片/文件 关闭 循环
编排整个工作流程扫描未读聊天 点击进入 点击图片/文件 关闭 循环
图片处理流程点击缩略图 打开大图 点击"..." 点击"使用预览打开" 关闭 Preview.app
文件处理流程点击文件气泡触发下载
图片处理锚点+箭头导航 点击最底部图片进入预览左箭头键依次切换到更早的图片
文件处理直接点击文件气泡触发下载
"""
import logging
@ -17,8 +17,10 @@ from .wechat_ui import WeChatUI, MessageItem
logger = logging.getLogger("wechat_clicker.automator")
SCROLL_ROUNDS = 5
SCROLL_LINES_PER_ROUND = 10
IMAGE_OVERLAP = 5
MAX_IMAGES_PER_CHAT = 30
MAX_CONSECUTIVE_FAILURES = 3
MAX_REANCHOR_ATTEMPTS = 2
class WeChatAutomator:
@ -37,6 +39,7 @@ class WeChatAutomator:
self._scan_count = 0
self._total_chats_processed = 0
self._total_media_clicked = 0
self._consecutive_recover_failures = 0
# ----------------------------------------------------------------
# 启动检查
@ -100,27 +103,39 @@ class WeChatAutomator:
self._print_stats()
def _run_one_cycle(self):
cycle_start = time.time()
if not self._single_mode:
if self.human.is_off_hours():
logger.info("当前为非工作时间,等待中...")
time.sleep(300)
return
if self.human.should_take_break():
self.human.long_break()
return
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
if not self.state.recover_to_chat_list():
logger.warning("无法恢复到聊天列表,跳过本次循环")
time.sleep(10)
self._consecutive_recover_failures += 1
backoff = min(10 * (2 ** (self._consecutive_recover_failures - 1)), 120)
logger.warning(
f"无法恢复到聊天列表 (连续失败 {self._consecutive_recover_failures}), "
f"等待 {backoff}s"
)
time.sleep(backoff)
return
self._consecutive_recover_failures = 0
self._scan_count += 1
global_unread = self.ui.get_global_unread_count()
# 忙时(未读 > 5跳过随机休息
if not self._single_mode:
if self.human.should_take_break(global_unread):
self.human.long_break()
return
if global_unread == 0:
logger.debug(f"[扫描#{self._scan_count}] 没有未读消息")
sleep_time = self.human.scan_interval_with_jitter()
@ -136,25 +151,49 @@ class WeChatAutomator:
time.sleep(sleep_time)
return
chats_to_process = [
all_candidates = [
c for c in unread_chats if self.config.should_process_chat(c.name)
]
filtered_out = [
c.name for c in unread_chats
if not self.config.should_process_chat(c.name)
]
if filtered_out:
logger.info(f" 过滤掉的聊天: {filtered_out}")
count = self.human.random_subset_count(
len(chats_to_process), self.config.max_chats_per_scan
len(all_candidates), self.config.max_chats_per_scan
)
chats_to_process = chats_to_process[:count]
chats_to_process = all_candidates[:count]
logger.info(
f"将处理 {len(chats_to_process)} 个聊天 "
f"(共 {len(unread_chats)} 个未读)"
f"(共 {len(unread_chats)} 个未读, 过滤 {len(filtered_out)})"
)
cycle_media_count = 0
media_before = self._total_media_clicked
for chat in chats_to_process:
self.human.delay("before_click_chat")
self._process_chat(chat)
self.human.delay("before_close_chat")
cycle_media_count = self._total_media_clicked - media_before
cycle_duration = time.time() - cycle_start
logger.info(
f"[扫描#{self._scan_count}] 完成: "
f"全局未读={global_unread}, "
f"处理聊天={len(chats_to_process)}/{len(unread_chats)}, "
f"过滤={len(filtered_out)}, "
f"点击媒体={cycle_media_count}, "
f"耗时={cycle_duration:.0f}s"
)
if not self._single_mode:
if chats_to_process:
# 刚处理完聊天,可能还有更多未读,立即重扫
logger.debug("还有可能的未读消息,立即重新扫描")
return
sleep_time = self.human.scan_interval_with_jitter()
logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描")
time.sleep(sleep_time)
@ -195,7 +234,7 @@ class WeChatAutomator:
msg_list_size = self.ax.get_size(msg_list)
logger.debug(f" 消息列表: pos={msg_list_pos} size={msg_list_size}")
media_count = self._process_media_with_scroll(main_win, msg_list, chat.name)
media_count = self._process_media_with_scroll(main_win, msg_list, chat.name, chat.unread_count)
self._total_media_clicked += media_count
self._total_chats_processed += 1
logger.info(f" {chat.name}: 本次处理了 {media_count} 个媒体")
@ -204,186 +243,167 @@ class WeChatAutomator:
# 带滚动的媒体处理
# ----------------------------------------------------------------
def _process_media_with_scroll(self, main_win, msg_list, chat_name: str) -> int:
"""先滚到底部看最新消息,再向上滚动加载历史并处理媒体。
用消息列表子元素索引做去重避免同一张图片被重复点击"""
def _process_media_with_scroll(self, main_win, msg_list, chat_name: str, unread_count: int = 0) -> int:
"""处理聊天中的图片和文件。
图片锚点+箭头导航内部管理滚动和锚点查找
文件直接点击可见的文件元素"""
total_clicked = 0
processed_indices = set()
# 先滚到消息列表底部(看到最新消息)
logger.info(f" {chat_name}: 滚动到最新消息...")
self.ax.scroll_to_bottom(msg_list, rounds=10, lines_per_round=-20)
self.human.random_delay(1.0, 2.0)
# 图片处理:锚点+箭头导航(滚动和锚点查找在内部完成)
if self.config.click_images:
image_count = min(max(5, unread_count) + IMAGE_OVERLAP, MAX_IMAGES_PER_CHAT)
image_clicked = self._process_images_with_arrow(msg_list, chat_name, image_count)
total_clicked += image_clicked
# 处理当前可见的媒体(最新消息)
clicked = self._process_visible_media(
main_win, msg_list, chat_name, processed_indices
)
total_clicked += clicked
# 向上滚动加载更多历史消息
for round_idx in range(SCROLL_ROUNDS):
# 安全网:滚动前确保没有残留的预览窗口
# 文件/视频处理:直接点击可见元素
if self.config.click_files or self.config.click_videos:
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
state = self.state.detect_state()
if state != UIState.MAIN_CHAT_LIST:
logger.debug(
f" 滚动前状态不干净: {state.value}, 清理额外窗口"
)
self._close_extra_windows()
file_clicked = self._process_visible_files(main_win, msg_list, chat_name)
total_clicked += file_clicked
self.human.delay("between_messages")
logger.debug(f" {chat_name}: 向上滚动第 {round_idx + 1}/{SCROLL_ROUNDS}")
self.ax.scroll_at_element(msg_list, lines=SCROLL_LINES_PER_ROUND)
self.human.random_delay(1.0, 2.5)
clicked = self._process_visible_media(
main_win, msg_list, chat_name, processed_indices
)
total_clicked += clicked
logger.debug(
f" {chat_name}: 滚动结束, 已处理索引={sorted(processed_indices)}"
)
return total_clicked
def _process_visible_media(
self, main_win, msg_list, chat_name: str, processed_indices: set
) -> int:
"""处理当前可见且未处理过的媒体消息。
通过子元素索引去重通过可见区域过滤跳过裁剪元素"""
# 获取消息列表可见区域
# ----------------------------------------------------------------
# 图片处理:锚点+箭头导航
# ----------------------------------------------------------------
def _find_anchor_image(self, msg_list):
"""在消息列表底部找到最后一个可见的图片元素(作为导航锚点)。
返回 (element, position, size) None"""
list_pos = self.ax.get_position(msg_list)
list_size = self.ax.get_size(msg_list)
visible_top = list_pos[1]
visible_bottom = list_pos[1] + list_size[1]
margin = 30
# 遍历消息列表的所有子元素,带索引
children = self.ax.get_children(msg_list)
targets = []
anchor = None
for idx, child in enumerate(children):
if idx in processed_indices:
for child in children:
if self.ax.get_role(child) != "AXStaticText":
continue
role = self.ax.get_role(child)
if role != "AXStaticText":
continue
title = self.ax.get_title(child)
if not title:
if title != "图片":
continue
size = self.ax.get_size(child)
if size[0] == 0 or size[1] == 0:
continue
msg_type = WeChatUI._classify_message(title, size)
if msg_type not in ("image", "file", "video"):
continue
# 检查配置是否处理该类型
if msg_type == "image" and not self.config.click_images:
continue
if msg_type == "file" and not self.config.click_files:
continue
if msg_type == "video" and not self.config.click_videos:
continue
# 可见性检查:元素中心必须在消息列表可见区域内(带 margin
pos = self.ax.get_position(child)
elem_center_y = pos[1] + size[1] // 2
center_y = pos[1] + size[1] // 2
if visible_top + margin <= center_y <= visible_bottom - margin:
anchor = (child, pos, size)
if elem_center_y < visible_top + margin:
logger.debug(
f" 跳过(中心在顶部外): idx={idx} type={msg_type} "
f"center_y={elem_center_y} < list_top+margin={visible_top + margin}"
)
continue
if elem_center_y > visible_bottom - margin:
logger.debug(
f" 跳过(中心在底部外): idx={idx} type={msg_type} "
f"center_y={elem_center_y} > list_bottom-margin={visible_bottom - margin}"
)
continue
return anchor
targets.append((idx, child, msg_type, title, size, pos))
if not targets:
logger.debug(f" {chat_name}: 当前视图无可处理的新媒体")
def _process_images_with_arrow(self, msg_list, chat_name: str, image_count: int) -> int:
"""用锚点+箭头导航处理多张图片。
内部管理锚点查找滚动和失败重锚"""
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
logger.debug(f" {chat_name}: 未找到可见图片,跳过图片处理")
return 0
logger.info(
f" {chat_name}: 当前可见 {len(targets)} 个新媒体 "
f"(已处理 {len(processed_indices)} 个)"
)
logger.info(f" {chat_name}: 将处理 {image_count} 张图片")
clicked = 0
for idx, child, msg_type, title, size, pos in targets:
self.human.delay("before_click_media")
consecutive_failures = 0
reanchor_count = 0
i = 0
short_title = title.replace("\n", " ")[:50]
logger.info(
f" [{msg_type}] {short_title} "
f"(idx={idx}, pos={pos}, size={size})"
)
while i < image_count:
logger.debug(f" [{i+1}/{image_count}] 点击锚点图片")
if not self.ax.click_at_element_offset(anchor_element, 120, anchor_size[1] // 2):
logger.warning(f" [{i+1}] 锚点图片点击失败")
if reanchor_count >= MAX_REANCHOR_ATTEMPTS:
logger.warning(f" 重锚次数已达上限({MAX_REANCHOR_ATTEMPTS}),中断")
break
logger.info(f" 尝试重锚 ({reanchor_count+1}/{MAX_REANCHOR_ATTEMPTS})...")
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
logger.warning(f" 重锚失败,未找到图片")
break
reanchor_count += 1
i = 0
consecutive_failures = 0
continue
msg_item = MessageItem(
element=child, msg_type=msg_type, title=title, size=size
)
time.sleep(2.0)
if msg_type == "image":
success = self._click_image(msg_item)
elif msg_type == "file":
success = self._click_file(msg_item)
else:
success = self._click_generic(msg_item)
windows = self.ui.get_all_windows()
if len(windows) < 2:
consecutive_failures += 1
logger.warning(f" [{i+1}] 预览窗口未打开 (连续失败 {consecutive_failures})")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
if reanchor_count >= MAX_REANCHOR_ATTEMPTS:
logger.warning(f" 重锚次数已达上限,中断")
break
logger.info(f" 连续 {consecutive_failures} 次失败,重锚 ({reanchor_count+1}/{MAX_REANCHOR_ATTEMPTS})...")
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
break
reanchor_count += 1
i = 0
consecutive_failures = 0
else:
i += 1
continue
# 无论成功失败都标记为已处理,避免重复尝试
processed_indices.add(idx)
if i > 0:
logger.debug(f" [{i+1}] 按左箭头 {i}")
for _ in range(i):
self.ax.send_left_arrow()
time.sleep(0.5)
time.sleep(0.5)
logger.info(f" [{i+1}/{image_count}] 处理图片 (箭头×{i})")
success = self._do_preview_and_close()
if success:
clicked += 1
logger.debug(f" 处理成功: idx={idx} {msg_type}")
consecutive_failures = 0
else:
logger.warning(f" 处理失败: idx={idx} {msg_type}")
consecutive_failures += 1
logger.warning(f" [{i+1}] 预览流程失败 (连续失败 {consecutive_failures})")
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
if reanchor_count >= MAX_REANCHOR_ATTEMPTS:
logger.warning(f" 重锚次数已达上限,中断")
break
logger.info(f" 连续 {consecutive_failures} 次失败,重锚 ({reanchor_count+1}/{MAX_REANCHOR_ATTEMPTS})...")
anchor_element, anchor_size = self._scroll_and_find_anchor(msg_list)
if not anchor_element:
break
reanchor_count += 1
i = 0
consecutive_failures = 0
continue
self.ui.ensure_wechat_frontmost()
time.sleep(0.3)
self.human.delay("between_messages")
i += 1
if self.ax.should_backoff():
logger.warning("连续错误过多,暂停处理")
self.ax.reset_error_count()
break
logger.info(f" {chat_name}: 箭头导航完成, 成功 {clicked}/{image_count}")
return clicked
# ----------------------------------------------------------------
# 图片处理:点击 → "..." → "使用预览打开" → 关闭 Preview
# ----------------------------------------------------------------
def _scroll_and_find_anchor(self, msg_list):
"""滚到底部并找锚点图片,返回 (element, size) 或 (None, None)。"""
self.ax.scroll_to_bottom(msg_list, rounds=5, lines_per_round=-20)
time.sleep(1.0)
anchor = self._find_anchor_image(msg_list)
if anchor:
pos = self.ax.get_position(anchor[0])
logger.debug(f" 锚点图片: pos={pos} size={anchor[2]}")
return anchor[0], anchor[2]
return None, None
def _click_image(self, msg) -> bool:
"""处理一张图片:点开大图 -> 点... -> 点使用预览打开 -> 关闭 Preview.app。"""
# Step 1: 点击图片缩略图打开大图预览
# 图片气泡在消息行 AXStaticText 的左侧区域(非中心),
# 用偏移 (120, height//2) 命中实际缩略图
logger.debug(" Step1: 点击图片缩略图")
x_off = 120
y_off = msg.size[1] // 2
if not self.ax.click_at_element_offset(msg.element, x_off, y_off):
logger.warning(" 图片缩略图点击失败")
return False
def _do_preview_and_close(self) -> bool:
"""在已打开的图片预览中执行: 找"..."→ 点击 → 找"使用预览打开"→ 点击 → 关闭 Preview.app。"""
self.human.random_delay(0.5, 1.0)
self.human.delay("after_click_media")
# Step 2: 检查预览窗口状态
state = self.state.detect_state()
logger.debug(f" Step2: 点击后状态={state.value}")
if state != UIState.MEDIA_PREVIEW:
logger.debug(" 图片预览窗口未出现,尝试在内嵌预览中继续")
# Step 3: 查找 "..." 按钮
self.human.random_delay(0.5, 1.5)
logger.debug(" Step3: 搜索'...'按钮")
# 找 "..." 按钮
more_btn = self._find_more_button_in_preview()
if more_btn is None:
logger.warning(" 未找到'...'按钮Escape 退出")
@ -391,8 +411,6 @@ class WeChatAutomator:
time.sleep(0.5)
return False
# Step 4: 点击 "..." 按钮
logger.debug(" Step4: 点击'...'按钮")
if not self.ax.click_at_element(more_btn):
if not self.ax.press(more_btn):
logger.warning(" '...'按钮点击失败")
@ -400,10 +418,9 @@ class WeChatAutomator:
time.sleep(0.5)
return False
self.human.random_delay(0.5, 1.5)
self.human.random_delay(0.5, 1.0)
# Step 5: 查找"使用预览打开"菜单项
logger.debug(" Step5: 搜索'使用预览打开'菜单项")
# 找"使用预览打开"菜单项
preview_item = self.ui.find_menu_item('使用"预览"打开')
if preview_item is None:
preview_item = self.ui.find_menu_item("预览")
@ -411,37 +428,94 @@ class WeChatAutomator:
preview_item = self.ui.find_menu_item("Preview")
if preview_item is None:
logger.warning(" 未找到'使用预览打开'菜单项Escape 退出")
logger.warning(" 未找到'使用预览打开'菜单项")
self.ax.send_escape_key()
time.sleep(0.3)
self.ax.send_escape_key()
time.sleep(0.5)
return False
# Step 6: 点击"使用预览打开"
logger.debug(" Step6: 点击'使用预览打开'")
if not self.ax.click_at_element(preview_item):
self.ax.press(preview_item)
self.human.random_delay(2.0, 4.0)
self.human.random_delay(2.0, 3.5)
# Step 7: 关闭所有额外窗口Preview.app + 微信预览窗口)
logger.debug(" Step7: 清理预览窗口")
# 关闭 Preview.app + 额外窗口
self._close_extra_windows()
# Step 8: Escape 关闭可能残留的内嵌预览
logger.debug(" Step8: Escape 关闭内嵌预览")
# Escape 关闭残留的内嵌预览
self.ax.send_escape_key()
time.sleep(0.5)
time.sleep(0.3)
state = self.state.detect_state()
if state == UIState.MEDIA_PREVIEW:
logger.debug(" 仍在预览状态,再次 Escape")
self.ax.send_escape_key()
time.sleep(0.3)
return True
# ----------------------------------------------------------------
# 文件/视频处理:直接点击可见元素
# ----------------------------------------------------------------
def _process_visible_files(self, main_win, msg_list, chat_name: str) -> int:
"""处理当前可见的文件和视频(不处理图片,图片走箭头导航)。"""
list_pos = self.ax.get_position(msg_list)
list_size = self.ax.get_size(msg_list)
visible_top = list_pos[1]
visible_bottom = list_pos[1] + list_size[1]
margin = 30
children = self.ax.get_children(msg_list)
targets = []
for child in children:
if self.ax.get_role(child) != "AXStaticText":
continue
title = self.ax.get_title(child)
if not title:
continue
size = self.ax.get_size(child)
if size[0] == 0 or size[1] == 0:
continue
msg_type = WeChatUI._classify_message(title, size)
if msg_type == "file" and not self.config.click_files:
continue
if msg_type == "video" and not self.config.click_videos:
continue
if msg_type not in ("file", "video"):
continue
pos = self.ax.get_position(child)
center_y = pos[1] + size[1] // 2
if center_y < visible_top + margin or center_y > visible_bottom - margin:
continue
targets.append((child, msg_type, title, size))
if not targets:
return 0
logger.info(f" {chat_name}: 发现 {len(targets)} 个文件/视频")
clicked = 0
for child, msg_type, title, size in targets:
self.human.delay("before_click_media")
short_title = title.replace("\n", " ")[:50]
logger.info(f" [{msg_type}] {short_title}")
msg_item = MessageItem(element=child, msg_type=msg_type, title=title, size=size)
if msg_type == "file":
success = self._click_file(msg_item)
else:
success = self._click_generic(msg_item)
if success:
clicked += 1
self.human.delay("between_messages")
return clicked
def _find_more_button_in_preview(self):
"""在图片预览区域查找"..."按钮(排除侧边栏区域)。"""
# 先在非主窗口(独立预览窗口)中找

View File

@ -50,6 +50,7 @@ kAXErrorInvalidUIElement = -25202
# 键码
kVK_Escape = 0x35
kVK_W = 0x0D
kVK_LeftArrow = 0x7B
class AXBridge:
@ -452,6 +453,10 @@ class AXBridge:
"""发送 Cmd+W 键事件(关闭窗口)。"""
self._send_key(kVK_W, flags=kCGEventFlagMaskCommand)
def send_left_arrow(self):
"""发送左箭头键事件(图片预览中切换上一张)。"""
self._send_key(kVK_LeftArrow)
def _send_key(self, key_code: int, flags: int = 0):
"""发送键盘事件(按下+抬起)。"""
try:

View File

@ -13,18 +13,18 @@ DEFAULTS = {
"process_name": "WeChat",
},
"scan": {
"interval_seconds": 30,
"max_chats_per_scan": 5,
"interval_seconds": 15,
"max_chats_per_scan": 0,
"scroll_chat_list": False,
},
"delays": {
"before_click_chat": [2, 5],
"after_open_chat": [1, 3],
"before_click_media": [1, 4],
"after_click_media": [3, 8],
"before_close_preview": [1, 3],
"before_close_chat": [1, 2],
"between_messages": [0.5, 2],
"before_click_chat": [0.5, 2],
"after_open_chat": [0.5, 1.5],
"before_click_media": [0.3, 1.5],
"after_click_media": [1.5, 4],
"before_close_preview": [0.3, 1],
"before_close_chat": [0.3, 1],
"between_messages": [0.2, 0.8],
},
"schedule": {
"enabled": True,

View File

@ -51,12 +51,14 @@ class HumanBehavior:
# 长休息
# ----------------------------------------------------------------
def should_take_break(self) -> bool:
def should_take_break(self, global_unread: int = 0) -> bool:
"""判断是否应该触发一次长休息。
每个循环有固定概率触发模拟真人偶尔离开电脑
忙时未读消息多跳过休息优先处理消息
"""
self._cycle_count += 1
if global_unread > 5:
return False
return random.random() < self._break_probability
def long_break(self):
@ -80,10 +82,12 @@ class HumanBehavior:
def random_subset_count(self, total: int, max_count: int) -> int:
"""决定本次处理多少个项目。
不总是处理 max_count 偶尔少处理一些模拟真人不会每次都处理所有内容
max_count=0 表示不限制处理全部
"""
if total <= 0:
return 0
if max_count <= 0:
return total
upper = min(total, max_count)
# 80% 概率处理全部上限内20% 概率减少 1-2 个
if random.random() < 0.8:

View File

@ -154,7 +154,12 @@ class StateMachine:
time.sleep(0.5)
if state == UIState.UNKNOWN:
# 尝试激活微信并点击侧边栏聊天按钮
# 先尝试关闭可能的弹窗/多余窗口
self.ax.send_escape_key()
time.sleep(0.3)
self.ax.send_cmd_w()
time.sleep(0.3)
# 重新激活微信并点击侧边栏聊天按钮
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
self.ui.click_sidebar_chat_tab()