This commit is contained in:
crislee 2026-04-22 19:28:54 +08:00
commit 86a47822ad
15 changed files with 2009 additions and 0 deletions

29
.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
# 用户配置(含个人设置)
config.yaml
# 日志
*.log
*.log.*
# Python
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
# 虚拟环境
venv/
.venv/
# macOS
.DS_Store
# IDE
.idea/
.vscode/
*.swp
*.swo
# Claude Code
.claude/

64
CLAUDE.md Normal file
View File

@ -0,0 +1,64 @@
# WeChat 消息自动点击器
自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载到本地目录。
## 项目背景
配合另一个信息收集项目使用:信息收集脚本负责将微信消息入库,但图片和文件需要被点击预览后微信才会下载原始文件到本地。本工具自动完成这个"点击"操作。
## 技术架构
- **语言**: Python 3.11+
- **核心技术**: macOS Accessibility API (AXUIElement) via pyobjc
- **目标应用**: 微信桌面端 v4.1.9+ (bundle ID: com.tencent.xinWeChat)
- **运行平台**: macOS Sonoma 14+
### 模块结构
```
wechat_clicker/
├── ax_bridge.py # AXUIElement 底层封装(属性读取、操作执行、键盘事件)
├── wechat_ui.py # 微信 UI 导航(找聊天列表、消息列表、解析 title 分类消息类型)
├── state_machine.py # UI 状态机(基于窗口数量判断状态、状态恢复)
├── automator.py # 主自动化逻辑(扫描→点击→预览→关闭→循环)
├── human_like.py # 拟人行为(高斯分布延迟、长休息、工作时间)
├── config.py # YAML 配置加载
└── logger_setup.py # 日志配置
```
### 关键设计决策
- 微信 v4.1.9 点击聊天会打开**独立窗口**(非页内导航),状态检测基于窗口计数
- 元素查找使用 role+name 搜索(非硬编码索引),适应 UI 变化
- 消息类型通过 title 内容判断:`"图片"` → 图片,`"文件\n..."` → 文件
- 预览通过 Escape 键关闭
## 使用方法
```bash
# 前置条件
pip install -r requirements.txt
# 系统设置 > 隐私与安全 > 辅助功能 → 添加终端/Python
# 复制配置
cp config.example.yaml config.yaml
# 运行
python main.py # 持续运行
python main.py --once # 单次扫描
python main.py --dry-run # 只扫描不点击
python main.py --dump-ui # 输出 UI 元素树
python main.py --debug # 详细日志
```
## 配置重点
- `config.yaml` 中可设置扫描间隔、延迟范围、白/黑名单、工作时间、媒体类型开关
- 默认不处理视频(`media.click_videos: false`
- 默认黑名单包含微信系统账号
## 注意事项
- 微信窗口需要保持可见(不能最小化)
- 运行时会占用微信前台操作
- 建议在专用电脑上运行

156
README.md Normal file
View File

@ -0,0 +1,156 @@
# WeChat 消息自动点击器
自动点击微信桌面端未读消息中的图片和文件,触发原始文件下载到本地目录。
## 背景
微信桌面端不会自动下载图片/文件的原始数据,需要用户手动点击预览后才会触发下载。本工具通过 macOS 辅助功能 API 自动完成这个操作,配合信息收集系统实现群聊数据的自动化入库。
## 系统要求
- macOS Sonoma 14+
- Python 3.11+
- 微信桌面端 v4.1.9+(已登录)
## 安装
### 1. 安装 Python 依赖
```bash
cd wechat_msg_clicker
pip install -r requirements.txt
```
依赖包:
- `pyobjc-framework-Quartz` — macOS 图形框架绑定
- `pyobjc-framework-ApplicationServices` — 辅助功能 API 绑定
- `pyobjc-framework-Cocoa` — macOS 应用框架绑定
- `PyYAML` — 配置文件解析
### 2. 授权辅助功能权限
脚本需要 macOS 辅助功能权限才能操作微信 UI 元素:
1. 打开 **系统设置** > **隐私与安全性** > **辅助功能**
2. 点击左下角 **+** 号
3. 添加你使用的终端应用(如 Terminal.app 或 iTerm2
4. 如果通过 Python 直接运行,还需要添加 Python 解释器路径(如 `/Users/你的用户名/miniconda3/bin/python3`
5. 确保添加后开关为 **开启** 状态
> 首次运行时如果未授权,脚本会弹出系统提示框引导你授权。
### 3. 准备配置文件
```bash
cp config.example.yaml config.yaml
```
按需编辑 `config.yaml`,主要配置项:
| 配置项 | 默认值 | 说明 |
|---|---|---|
| `scan.interval_seconds` | 30 | 扫描间隔(秒) |
| `scan.max_chats_per_scan` | 5 | 每次最多处理几个聊天 |
| `schedule.start_hour` | 8 | 工作开始时间 |
| `schedule.end_hour` | 23 | 工作结束时间 |
| `filter.mode` | all | 过滤模式all / whitelist / blacklist |
| `filter.blacklist` | 微信系统号 | 排除的聊天名称 |
| `media.click_images` | true | 是否点击图片 |
| `media.click_files` | true | 是否点击文件 |
| `media.click_videos` | false | 是否点击视频(默认关闭) |
| `media.max_media_per_chat` | 20 | 每个聊天最多点击几个媒体 |
### 4. 确保微信就绪
- 微信桌面端已启动并登录
- 微信窗口保持可见(不能最小化到 Dock
- 建议在专用电脑上运行,脚本运行时会操作微信前台窗口
## 验证流程
建议按以下顺序逐步验证,确认每步正常后再进入下一步:
### Step 1验证 UI 访问
```bash
python main.py --dump-ui
```
预期输出:微信的 UI 元素树,包括窗口、按钮、聊天列表等信息。
**如果报错**
- `辅助功能权限未授予` → 检查系统设置中的辅助功能授权
- `微信未运行` → 确保微信桌面端已启动
- `未找到微信主窗口` → 确保微信已登录,窗口未最小化
### Step 2试运行扫描
```bash
python main.py --dry-run --once --debug
```
预期输出:扫描聊天列表,显示有未读消息的聊天和媒体数量,但**不会实际点击**。
检查日志确认:
- 能正确识别未读聊天
- 聊天名称解析正确
- 未读数量解析正确
### Step 3单次完整执行
```bash
python main.py --once --debug
```
这会执行一次完整的循环:扫描 → 点击聊天 → 点击图片/文件 → 关闭预览 → 关闭会话。
观察过程中:
- 微信窗口是否正常被操作
- 图片预览是否正常打开和关闭
- 是否能正确返回聊天列表
### Step 4正式运行
```bash
python main.py
```
脚本会持续运行,按配置的间隔和工作时间自动处理新消息。
后台运行建议使用 `nohup``tmux`
```bash
# 方式一nohup
nohup python main.py > /dev/null 2>&1 &
# 方式二tmux推荐方便查看日志
tmux new -s wechat
python main.py
# Ctrl+B D 分离会话
# tmux attach -t wechat 重新连接
```
## 命令参考
| 命令 | 说明 |
|---|---|
| `python main.py` | 持续运行 |
| `python main.py --once` | 单次扫描后退出 |
| `python main.py --dry-run` | 只扫描不点击 |
| `python main.py --dump-ui` | 输出微信 UI 元素树 |
| `python main.py --debug` | 开启 DEBUG 级别日志 |
| `python main.py --config path/to/config.yaml` | 指定配置文件 |
参数可组合使用,如 `python main.py --dry-run --once --debug`
## 日志
日志同时输出到控制台和文件 `wechat_clicker.log`(可在配置中修改)。
日志文件自动轮转,默认最大 10MB保留 5 个备份。
## 注意事项
- **封号风险**:工具内置了多种拟人策略(随机延迟、长休息、工作时间限制),但自动化操作始终存在被检测的可能,请自行评估风险
- **微信更新**:微信版本更新可能改变 UI 结构,导致脚本失效。如遇到问题,先用 `--dump-ui` 检查 UI 变化
- **前台占用**:脚本运行时会操作微信窗口,不建议同时手动使用微信

53
config.example.yaml Normal file
View File

@ -0,0 +1,53 @@
# WeChat 消息自动点击器 配置文件
# 复制为 config.yaml 后根据需要修改
wechat:
bundle_id: "com.tencent.xinWeChat"
process_name: "WeChat"
# 扫描行为
scan:
interval_seconds: 30 # 每次扫描间隔(秒)
max_chats_per_scan: 5 # 每次扫描最多处理几个聊天
scroll_chat_list: false # 是否滚动聊天列表查找更多聊天
# 延迟范围(秒),模拟真人操作节奏
delays:
before_click_chat: [2, 5] # 点击聊天前
after_open_chat: [1, 3] # 打开会话后等待
before_click_media: [1, 4] # 点击图片/文件前
after_click_media: [3, 8] # 点击图片/文件后(等待下载)
before_close_preview: [1, 3] # 关闭预览前
before_close_chat: [1, 2] # 关闭会话窗口前
between_messages: [0.5, 2] # 处理每条消息之间
# 工作时间24小时制
schedule:
enabled: true
start_hour: 8
end_hour: 23
pause_on_weekends: false
# 聊天过滤
filter:
mode: "all" # "all": 全部, "whitelist": 仅白名单, "blacklist": 排除黑名单
whitelist: [] # 白名单模式下,仅处理这些聊天
blacklist: # 黑名单模式下,排除这些聊天
- "腾讯新闻"
- "微信支付"
- "微信团队"
# 媒体处理
media:
click_images: true # 是否点击图片
click_files: true # 是否点击文件
click_videos: false # 是否点击视频(视频可能很大,默认关闭)
max_media_per_chat: 20 # 每个聊天最多点击几个媒体
# 日志配置
logging:
level: "INFO" # DEBUG, INFO, WARNING, ERROR
file: "wechat_clicker.log"
max_bytes: 10485760 # 10MB
backup_count: 5
console: true

74
main.py Normal file
View File

@ -0,0 +1,74 @@
"""WeChat 消息自动点击器 — 入口
自动点击微信桌面端未读消息中的图片和文件触发原始文件下载
"""
import argparse
import signal
import sys
from wechat_clicker.config import Config
from wechat_clicker.automator import WeChatAutomator
from wechat_clicker.logger_setup import setup_logging
def main():
parser = argparse.ArgumentParser(
description="微信消息自动点击器 — 自动点击图片和文件触发下载"
)
parser.add_argument(
"--config", default="config.yaml",
help="配置文件路径 (默认: config.yaml)"
)
parser.add_argument(
"--dry-run", action="store_true",
help="试运行模式:只扫描不点击"
)
parser.add_argument(
"--once", action="store_true",
help="只执行一次扫描循环后退出"
)
parser.add_argument(
"--debug", action="store_true",
help="开启详细日志 (DEBUG 级别)"
)
parser.add_argument(
"--dump-ui", action="store_true",
help="输出微信 UI 元素树后退出 (调试用)"
)
args = parser.parse_args()
# 加载配置
config = Config(args.config)
if args.debug:
config.override("logging.level", "DEBUG")
# 初始化日志
logger = setup_logging(config)
logger.info("微信自动点击器启动")
if args.dry_run:
logger.info("*** 试运行模式:只扫描不点击 ***")
# 优雅退出
def signal_handler(sig, frame):
logger.info("收到退出信号,正在关闭...")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 创建自动化器
automator = WeChatAutomator(config, dry_run=args.dry_run)
# 执行
if args.dump_ui:
automator.dump_ui_tree()
elif args.once:
automator.run_once()
else:
automator.run()
if __name__ == "__main__":
main()

67
project.md Normal file
View File

@ -0,0 +1,67 @@
# 项目规划WeChat 消息自动点击器
## 项目目标
创建一个长期稳定运行的自动化工具,自动点击微信桌面端中未读消息的图片和文件,触发微信下载原始文件到本地目录,配合信息收集系统完成数据入库流程。
## 项目状态
### 已完成 (v0.1.0 - 2026/04/22)
- [x] 项目结构搭建
- [x] 配置系统YAML 配置文件、默认值、白/黑名单)
- [x] AXUIElement 底层封装ax_bridge.py
- [x] 微信 UI 导航wechat_ui.py— 聊天列表解析、消息类型分类
- [x] UI 状态机state_machine.py— 基于窗口计数的状态检测与恢复
- [x] 拟人行为引擎human_like.py— 高斯分布延迟、长休息、工作时间
- [x] 主自动化逻辑automator.py— 完整扫描-点击-关闭循环
- [x] 入口脚本main.py— 参数解析、信号处理
- [x] 调试工具(--dump-ui, --dry-run
### 待验证
- [ ] 在真实环境中测试 AXUIElement 对微信的访问能力
- [ ] 验证聊天列表 title 解析的准确性
- [ ] 验证图片点击后预览关闭的可靠性
- [ ] 长时间运行稳定性测试
### 未来可能的改进
- [ ] 聊天列表滚动支持(处理不在可见区域的聊天)
- [ ] 消息滚动支持(处理更早的图片消息)
- [ ] 已处理消息去重(记录已点击的媒体,避免重复)
- [ ] 微信版本适配(检测 UI 结构变化并自动调整)
- [ ] 运行状态 Web 面板(远程监控)
- [ ] 与信息收集系统直接集成
## 技术细节
### 微信 UI 结构 (v4.1.9)
- 主窗口 "微信" 包含侧边栏 + 聊天列表
- 聊天列表:`AXList name="会话"`,子元素为 `AXStaticText`
- 点击聊天项打开独立会话窗口
- 消息列表:`AXList name="消息"`
- 图片消息 title = "图片",文件消息 title 以 "文件\n" 开头
### 防封策略
- 高斯分布随机延迟(非均匀)
- ±20% 扫描间隔抖动
- 5% 概率长休息30-120 秒)
- 工作时间限制
- 每次最多 5 个聊天 / 每聊天最多 20 个媒体
### 依赖
- pyobjc-framework-Quartz
- pyobjc-framework-ApplicationServices
- pyobjc-framework-Cocoa
- PyYAML
### 系统要求
- macOS Sonoma 14+
- Python 3.11+
- 辅助功能权限(终端/Python 需要授权)
- 微信桌面端 v4.1.9+

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
pyobjc-framework-Quartz>=10.0
pyobjc-framework-ApplicationServices>=10.0
pyobjc-framework-Cocoa>=10.0
PyYAML>=6.0

View File

@ -0,0 +1 @@
"""WeChat 消息自动点击器"""

334
wechat_clicker/automator.py Normal file
View File

@ -0,0 +1,334 @@
"""主自动化逻辑
编排整个工作流程扫描未读聊天 点击进入 点击图片/文件 关闭预览 关闭会话 循环
"""
import logging
import time
from .ax_bridge import AXBridge
from .config import Config
from .human_like import HumanBehavior
from .state_machine import StateMachine, UIState
from .wechat_ui import WeChatUI
logger = logging.getLogger("wechat_clicker.automator")
class WeChatAutomator:
"""微信消息自动点击器"""
def __init__(self, config: Config, dry_run: bool = False):
self.config = config
self.dry_run = dry_run
# 初始化各组件
self.ax = AXBridge()
self.ui = WeChatUI(self.ax, config.bundle_id)
self.state = StateMachine(self.ax, self.ui)
self.human = HumanBehavior(config)
# 统计
self._scan_count = 0
self._total_chats_processed = 0
self._total_media_clicked = 0
# ----------------------------------------------------------------
# 启动检查
# ----------------------------------------------------------------
def verify_setup(self) -> bool:
"""验证环境和权限。"""
# 检查辅助功能权限
if not self.ax.check_accessibility():
logger.error("缺少辅助功能权限,无法继续")
return False
# 检查微信是否运行
app_ref = self.ui.get_app_ref()
if app_ref is None:
logger.error("微信未运行,请先启动微信桌面端")
return False
# 检查主窗口
main_win = self.ui.get_main_window()
if main_win is None:
logger.error("未找到微信主窗口,请确保微信已登录并可见")
return False
logger.info("环境检查通过")
return True
# ----------------------------------------------------------------
# 主循环
# ----------------------------------------------------------------
def run(self):
"""永久运行的主循环。"""
logger.info("微信自动点击器启动")
if not self.verify_setup():
return
while True:
try:
self._run_one_cycle()
except KeyboardInterrupt:
logger.info("用户中断,正在退出...")
break
except Exception as e:
logger.error(f"主循环异常: {e}", exc_info=True)
# 尝试恢复状态
try:
self.state.recover_to_chat_list()
except Exception:
pass
time.sleep(10)
self._print_stats()
def run_once(self):
"""运行一次扫描循环。"""
logger.info("执行单次扫描")
if not self.verify_setup():
return
self._run_one_cycle()
self._print_stats()
def _run_one_cycle(self):
"""执行一个完整的扫描-处理循环。"""
# 检查工作时间
if self.human.is_off_hours():
logger.info("当前为非工作时间,等待中...")
time.sleep(300)
return
# 偶尔触发长休息
if self.human.should_take_break():
self.human.long_break()
return
# 确保微信在前台
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
# 恢复到聊天列表
if not self.state.recover_to_chat_list():
logger.warning("无法恢复到聊天列表,跳过本次循环")
time.sleep(10)
return
self._scan_count += 1
# 检查是否有未读消息
global_unread = self.ui.get_global_unread_count()
if global_unread == 0:
logger.debug(f"[扫描#{self._scan_count}] 没有未读消息")
sleep_time = self.human.scan_interval_with_jitter()
time.sleep(sleep_time)
return
logger.info(f"[扫描#{self._scan_count}] 全局未读: {global_unread}")
# 获取未读聊天列表
unread_chats = self.ui.get_unread_chats()
if not unread_chats:
logger.debug("聊天列表中未发现未读项(可能需要滚动)")
sleep_time = self.human.scan_interval_with_jitter()
time.sleep(sleep_time)
return
# 过滤和限制数量
chats_to_process = []
for chat in unread_chats:
if self.config.should_process_chat(chat.name):
chats_to_process.append(chat)
count = self.human.random_subset_count(
len(chats_to_process), self.config.max_chats_per_scan
)
chats_to_process = chats_to_process[:count]
logger.info(
f"将处理 {len(chats_to_process)} 个聊天 "
f"(共 {len(unread_chats)} 个未读)"
)
# 逐个处理
for chat in chats_to_process:
self.human.delay("before_click_chat")
self._process_chat(chat)
self.human.delay("before_close_chat")
# 等待下次扫描
sleep_time = self.human.scan_interval_with_jitter()
logger.debug(f"等待 {sleep_time:.0f}s 后进行下次扫描")
time.sleep(sleep_time)
# ----------------------------------------------------------------
# 处理单个聊天
# ----------------------------------------------------------------
def _process_chat(self, chat):
"""打开一个聊天,处理其中的媒体消息,然后关闭。"""
logger.info(f"处理聊天: {chat.name} (未读: {chat.unread_count})")
if self.dry_run:
logger.info(f" [DRY-RUN] 跳过点击: {chat.name}")
return
# 点击聊天项打开会话
if not self.ax.press(chat.element):
logger.warning(f" 点击聊天项失败: {chat.name}")
return
self.human.delay("after_open_chat")
# 验证会话窗口已打开
state = self.state.detect_state()
if state != UIState.CONVERSATION_OPEN:
logger.warning(
f" 会话窗口未打开 (状态: {state.value}),尝试恢复"
)
self.state.recover_to_chat_list()
return
# 处理会话中的媒体
conv_window = self.ui.get_conversation_window()
if conv_window:
media_count = self._process_media(conv_window, chat.name)
self._total_media_clicked += media_count
self._total_chats_processed += 1
# 关闭会话窗口
self.human.delay("before_close_chat")
if not self.state.close_current_conversation():
logger.warning(" 关闭会话失败,尝试恢复")
self.state.recover_to_chat_list()
# ----------------------------------------------------------------
# 处理媒体消息
# ----------------------------------------------------------------
def _process_media(self, conv_window, chat_name: str) -> int:
"""在打开的会话中查找并点击媒体消息。
Returns:
点击的媒体数量
"""
media_messages = self.ui.get_media_messages(conv_window)
if not media_messages:
logger.debug(f" {chat_name}: 未发现可见媒体消息")
return 0
# 过滤需要点击的类型
targets = []
for msg in media_messages:
if msg.msg_type == "image" and self.config.click_images:
targets.append(msg)
elif msg.msg_type == "file" and self.config.click_files:
targets.append(msg)
elif msg.msg_type == "video" and self.config.click_videos:
targets.append(msg)
if not targets:
logger.debug(f" {chat_name}: 无需处理的媒体")
return 0
# 限制数量,从最新的开始(列表末尾 = 最新)
max_count = self.config.max_media_per_chat
targets = targets[-max_count:] if len(targets) > max_count else targets
# 反转,从最新的开始处理
targets = list(reversed(targets))
logger.info(f" {chat_name}: 发现 {len(targets)} 个媒体消息待处理")
clicked = 0
for msg in targets:
self.human.delay("before_click_media")
short_title = msg.title.replace("\n", " ")[:50]
logger.info(f" 点击{msg.msg_type}: {short_title}")
# 点击媒体
if not self.ax.press(msg.element):
logger.warning(f" 点击失败: {short_title}")
continue
self.human.delay("after_click_media")
# 关闭可能出现的预览
self._dismiss_preview_safe()
clicked += 1
self.human.delay("between_messages")
# 检查连续错误
if self.ax.should_backoff():
logger.warning("连续错误过多,暂停处理")
self.ax.reset_error_count()
break
return clicked
def _dismiss_preview_safe(self):
"""安全地关闭可能出现的媒体预览。"""
# 等一小会儿让预览可能出现
self.human.micro_jitter()
state = self.state.detect_state()
if state == UIState.MEDIA_PREVIEW:
self.human.delay("before_close_preview")
self.state.dismiss_preview()
else:
# 保守策略:即使未检测到预览也发送 Escape
# 因为某些预览可能不会创建新窗口
self.ax.send_escape_key()
time.sleep(0.3)
# ----------------------------------------------------------------
# 调试工具
# ----------------------------------------------------------------
def dump_ui_tree(self):
"""输出微信 UI 元素树(调试用)。"""
if not self.verify_setup():
return
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
# 输出主窗口
main_win = self.ui.get_main_window()
if main_win:
print("=== 主窗口 (微信) ===")
print(self.ax.dump_element(main_win, max_depth=5))
# 输出会话窗口
conv_windows = self.ui.get_conversation_windows()
for win in conv_windows:
title = self.ax.get_title(win)
print(f"\n=== 会话窗口 ({title}) ===")
print(self.ax.dump_element(win, max_depth=5))
# 输出聊天列表解析结果
print("\n=== 聊天列表解析 ===")
items = self.ui.get_chat_items()
for item in items:
status = f"[未读:{item.unread_count}]" if item.unread_count > 0 else ""
print(f" {item.name} {status} | {item.preview} | {item.timestamp}")
# ----------------------------------------------------------------
# 统计
# ----------------------------------------------------------------
def _print_stats(self):
"""输出运行统计。"""
logger.info(
f"运行统计: 扫描={self._scan_count}, "
f"处理聊天={self._total_chats_processed}, "
f"点击媒体={self._total_media_clicked}"
)

300
wechat_clicker/ax_bridge.py Normal file
View File

@ -0,0 +1,300 @@
"""macOS Accessibility API (AXUIElement) 底层封装
通过 pyobjc 调用 ApplicationServices 框架提供对 UI 元素的读取和操作能力
"""
import logging
import time
from ApplicationServices import (
AXIsProcessTrusted,
AXIsProcessTrustedWithOptions,
AXUIElementCreateApplication,
AXUIElementCopyAttributeValue,
AXUIElementCopyAttributeNames,
AXUIElementCopyActionNames,
AXUIElementPerformAction,
)
from Cocoa import (
NSRunningApplication,
NSWorkspace,
NSApplicationActivateIgnoringOtherApps,
)
from Quartz import (
CGEventCreateKeyboardEvent,
CGEventPost,
CGEventSetFlags,
kCGHIDEventTap,
kCGEventFlagMaskCommand,
)
from CoreFoundation import kCFBooleanTrue
logger = logging.getLogger("wechat_clicker.ax_bridge")
# AXError 常量
kAXErrorSuccess = 0
kAXErrorAttributeUnsupported = -25205
kAXErrorNoValue = -25212
kAXErrorInvalidUIElement = -25202
# 键码
kVK_Escape = 0x35
kVK_W = 0x0D
class AXBridge:
"""AXUIElement 底层封装"""
def __init__(self):
self._error_count = 0
self._max_errors_before_backoff = 10
self._backoff_seconds = 60
# ----------------------------------------------------------------
# 辅助功能权限检查
# ----------------------------------------------------------------
def check_accessibility(self) -> bool:
"""检查当前进程是否拥有辅助功能权限。"""
trusted = AXIsProcessTrusted()
if not trusted:
logger.error(
"辅助功能权限未授予!请前往:系统设置 > 隐私与安全 > 辅助功能,"
"添加当前终端应用或 Python 解释器。"
)
# 弹出系统提示框
AXIsProcessTrustedWithOptions(
{
"AXTrustedCheckOptionPrompt": kCFBooleanTrue
}
)
return trusted
# ----------------------------------------------------------------
# 获取应用引用
# ----------------------------------------------------------------
def get_app_ref(self, bundle_id: str):
"""根据 bundle ID 获取应用的 AXUIElement 引用。返回 None 表示应用未运行。"""
apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id)
if not apps or len(apps) == 0:
logger.error(f"未找到运行中的应用: {bundle_id}")
return None
app = apps[0]
pid = app.processIdentifier()
logger.debug(f"找到应用 {bundle_id}, PID={pid}")
return AXUIElementCreateApplication(pid)
def bring_to_front(self, bundle_id: str) -> bool:
"""将指定应用带到最前台。"""
apps = NSRunningApplication.runningApplicationsWithBundleIdentifier_(bundle_id)
if not apps or len(apps) == 0:
return False
app = apps[0]
return app.activateWithOptions_(NSApplicationActivateIgnoringOtherApps)
# ----------------------------------------------------------------
# 属性读取
# ----------------------------------------------------------------
def get_attribute(self, element, attr_name: str):
"""读取 UI 元素的指定属性。失败返回 None。"""
try:
err, value = AXUIElementCopyAttributeValue(element, attr_name, None)
if err == kAXErrorSuccess:
self._error_count = 0
return value
if err not in (kAXErrorNoValue, kAXErrorAttributeUnsupported):
logger.debug(f"读取属性 {attr_name} 失败, 错误码: {err}")
return None
except Exception as e:
logger.debug(f"读取属性 {attr_name} 异常: {e}")
return None
def get_attribute_names(self, element) -> list:
"""获取元素支持的所有属性名。"""
try:
err, names = AXUIElementCopyAttributeNames(element, None)
if err == kAXErrorSuccess:
return list(names) if names else []
return []
except Exception:
return []
def get_action_names(self, element) -> list:
"""获取元素支持的所有操作名。"""
try:
err, names = AXUIElementCopyActionNames(element, None)
if err == kAXErrorSuccess:
return list(names) if names else []
return []
except Exception:
return []
def get_children(self, element) -> list:
"""获取子元素列表。"""
children = self.get_attribute(element, "AXChildren")
if children is None:
return []
return list(children)
def get_role(self, element) -> str:
"""获取元素角色AXRole"""
return self.get_attribute(element, "AXRole") or ""
def get_title(self, element) -> str:
"""获取元素标题AXTitle"""
return self.get_attribute(element, "AXTitle") or ""
def get_value(self, element) -> str:
"""获取元素值AXValue"""
return self.get_attribute(element, "AXValue") or ""
def get_name(self, element) -> str:
"""获取元素名称,优先 AXTitle其次 AXValue。"""
title = self.get_attribute(element, "AXTitle")
if title:
return title
value = self.get_attribute(element, "AXValue")
if value:
return value
return ""
def get_description(self, element) -> str:
"""获取元素描述AXDescription"""
return self.get_attribute(element, "AXDescription") or ""
def get_size(self, element) -> tuple:
"""获取元素尺寸 (width, height)。返回 (0, 0) 表示不可见或失败。"""
size = self.get_attribute(element, "AXSize")
if size is None:
return (0, 0)
try:
return (int(size.width), int(size.height))
except (AttributeError, TypeError):
return (0, 0)
def get_position(self, element) -> tuple:
"""获取元素位置 (x, y)。"""
pos = self.get_attribute(element, "AXPosition")
if pos is None:
return (0, 0)
try:
return (int(pos.x), int(pos.y))
except (AttributeError, TypeError):
return (0, 0)
def get_windows(self, app_ref) -> list:
"""获取应用的所有窗口。"""
windows = self.get_attribute(app_ref, "AXWindows")
if windows is None:
return []
return list(windows)
def get_focused_window(self, app_ref):
"""获取当前聚焦的窗口。"""
return self.get_attribute(app_ref, "AXFocusedWindow")
# ----------------------------------------------------------------
# 操作执行
# ----------------------------------------------------------------
def perform_action(self, element, action: str) -> bool:
"""对元素执行指定操作。"""
try:
err = AXUIElementPerformAction(element, action)
if err == kAXErrorSuccess:
self._error_count = 0
return True
logger.debug(f"执行操作 {action} 失败, 错误码: {err}")
self._error_count += 1
return False
except Exception as e:
logger.debug(f"执行操作 {action} 异常: {e}")
self._error_count += 1
return False
def press(self, element) -> bool:
"""对元素执行 AXPress等效点击"""
return self.perform_action(element, "AXPress")
# ----------------------------------------------------------------
# 键盘事件
# ----------------------------------------------------------------
def send_escape_key(self):
"""发送 Escape 键事件。"""
self._send_key(kVK_Escape)
def send_cmd_w(self):
"""发送 Cmd+W 键事件(关闭窗口)。"""
self._send_key(kVK_W, flags=kCGEventFlagMaskCommand)
def _send_key(self, key_code: int, flags: int = 0):
"""发送键盘事件(按下+抬起)。"""
try:
# Key down
event_down = CGEventCreateKeyboardEvent(None, key_code, True)
if flags:
CGEventSetFlags(event_down, flags)
CGEventPost(kCGHIDEventTap, event_down)
time.sleep(0.05)
# Key up
event_up = CGEventCreateKeyboardEvent(None, key_code, False)
if flags:
CGEventSetFlags(event_up, flags)
CGEventPost(kCGHIDEventTap, event_up)
except Exception as e:
logger.error(f"发送键盘事件失败: {e}")
# ----------------------------------------------------------------
# 错误管理
# ----------------------------------------------------------------
def should_backoff(self) -> bool:
"""连续错误过多时应暂停一段时间。"""
return self._error_count >= self._max_errors_before_backoff
def reset_error_count(self):
"""重置错误计数。"""
self._error_count = 0
# ----------------------------------------------------------------
# 调试工具
# ----------------------------------------------------------------
def dump_element(self, element, indent: int = 0, max_depth: int = 5) -> str:
"""递归输出元素树(调试用)。"""
if indent >= max_depth:
return ""
lines = []
role = self.get_role(element)
title = self.get_title(element)
name_attr = self.get_attribute(element, "AXValue") or ""
desc = self.get_description(element)
size = self.get_size(element)
prefix = " " * indent
info_parts = [f"role={role}"]
if title:
# 截断过长的 title用单行表示
short_title = title.replace("\n", "\\n")[:80]
info_parts.append(f'title="{short_title}"')
if name_attr:
short_name = str(name_attr).replace("\n", "\\n")[:80]
info_parts.append(f'value="{short_name}"')
if desc:
info_parts.append(f'desc="{desc}"')
if size != (0, 0):
info_parts.append(f"size={size[0]}x{size[1]}")
lines.append(f"{prefix}[{', '.join(info_parts)}]")
children = self.get_children(element)
for child in children:
lines.append(self.dump_element(child, indent + 1, max_depth))
return "\n".join(lines)

201
wechat_clicker/config.py Normal file
View File

@ -0,0 +1,201 @@
"""配置加载与管理"""
import os
import random
import yaml
from datetime import datetime
# 默认配置
DEFAULTS = {
"wechat": {
"bundle_id": "com.tencent.xinWeChat",
"process_name": "WeChat",
},
"scan": {
"interval_seconds": 30,
"max_chats_per_scan": 5,
"scroll_chat_list": False,
},
"delays": {
"before_click_chat": [2, 5],
"after_open_chat": [1, 3],
"before_click_media": [1, 4],
"after_click_media": [3, 8],
"before_close_preview": [1, 3],
"before_close_chat": [1, 2],
"between_messages": [0.5, 2],
},
"schedule": {
"enabled": True,
"start_hour": 8,
"end_hour": 23,
"pause_on_weekends": False,
},
"filter": {
"mode": "all",
"whitelist": [],
"blacklist": ["腾讯新闻", "微信支付", "微信团队"],
},
"media": {
"click_images": True,
"click_files": True,
"click_videos": False,
"max_media_per_chat": 20,
},
"logging": {
"level": "INFO",
"file": "wechat_clicker.log",
"max_bytes": 10485760,
"backup_count": 5,
"console": True,
},
}
def _deep_merge(base: dict, override: dict) -> dict:
"""深度合并两个字典override 中的值覆盖 base 中的值。"""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = _deep_merge(result[key], value)
else:
result[key] = value
return result
class Config:
"""配置管理器"""
def __init__(self, config_path: str = "config.yaml"):
self._config_path = config_path
self._data = self._load()
def _load(self) -> dict:
"""加载配置文件,与默认值合并。"""
if os.path.exists(self._config_path):
with open(self._config_path, "r", encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
return _deep_merge(DEFAULTS, user_config)
return DEFAULTS.copy()
def _get(self, dotted_key: str, default=None):
"""通过点号分隔的 key 获取配置值。例如 'scan.interval_seconds'"""
keys = dotted_key.split(".")
value = self._data
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def override(self, dotted_key: str, value):
"""运行时覆盖某个配置值。"""
keys = dotted_key.split(".")
target = self._data
for key in keys[:-1]:
target = target.setdefault(key, {})
target[keys[-1]] = value
# --- 便捷属性 ---
@property
def bundle_id(self) -> str:
return self._get("wechat.bundle_id")
@property
def process_name(self) -> str:
return self._get("wechat.process_name")
@property
def scan_interval(self) -> int:
return self._get("scan.interval_seconds")
@property
def max_chats_per_scan(self) -> int:
return self._get("scan.max_chats_per_scan")
@property
def scroll_chat_list(self) -> bool:
return self._get("scan.scroll_chat_list")
@property
def click_images(self) -> bool:
return self._get("media.click_images")
@property
def click_files(self) -> bool:
return self._get("media.click_files")
@property
def click_videos(self) -> bool:
return self._get("media.click_videos")
@property
def max_media_per_chat(self) -> int:
return self._get("media.max_media_per_chat")
@property
def log_level(self) -> str:
return self._get("logging.level")
@property
def log_file(self) -> str:
return self._get("logging.file")
@property
def log_max_bytes(self) -> int:
return self._get("logging.max_bytes")
@property
def log_backup_count(self) -> int:
return self._get("logging.backup_count")
@property
def log_console(self) -> bool:
return self._get("logging.console")
# --- 延迟 ---
def get_delay(self, delay_name: str) -> float:
"""获取指定名称的随机延迟时间(秒),使用截断正态分布。"""
delay_range = self._get(f"delays.{delay_name}", [1, 3])
min_val, max_val = delay_range[0], delay_range[1]
mid = (min_val + max_val) / 2
std = (max_val - min_val) / 4 # 大部分值落在范围内
value = random.gauss(mid, std)
return max(min_val, min(max_val, value))
# --- 工作时间 ---
def is_within_working_hours(self) -> bool:
"""检查当前时间是否在配置的工作时间内。"""
if not self._get("schedule.enabled"):
return True
now = datetime.now()
# 周末检查
if self._get("schedule.pause_on_weekends") and now.weekday() >= 5:
return False
start_hour = self._get("schedule.start_hour")
end_hour = self._get("schedule.end_hour")
return start_hour <= now.hour < end_hour
# --- 聊天过滤 ---
def should_process_chat(self, chat_name: str) -> bool:
"""根据白名单/黑名单判断是否处理此聊天。"""
mode = self._get("filter.mode", "all")
if mode == "all":
# 即使在 all 模式下也检查黑名单
blacklist = self._get("filter.blacklist", [])
return chat_name not in blacklist
elif mode == "whitelist":
whitelist = self._get("filter.whitelist", [])
return chat_name in whitelist
elif mode == "blacklist":
blacklist = self._get("filter.blacklist", [])
return chat_name not in blacklist
return True

View File

@ -0,0 +1,91 @@
"""拟人行为模拟
使用高斯分布随机延迟偶尔长休息工作时间限制等策略
模拟真人操作节奏降低被检测风险
"""
import logging
import random
import time
from .config import Config
logger = logging.getLogger("wechat_clicker.human_like")
class HumanBehavior:
"""拟人行为引擎"""
def __init__(self, config: Config):
self.config = config
self._cycle_count = 0
self._break_probability = 0.05 # 5% 概率触发长休息
self._break_min = 30
self._break_max = 120
def delay(self, delay_name: str):
"""执行指定名称的随机延迟。"""
seconds = self.config.get_delay(delay_name)
logger.debug(f"延迟 {delay_name}: {seconds:.1f}s")
time.sleep(seconds)
def random_delay(self, min_s: float, max_s: float):
"""执行指定范围内的随机延迟(高斯分布)。"""
mid = (min_s + max_s) / 2
std = (max_s - min_s) / 4
value = random.gauss(mid, std)
value = max(min_s, min(max_s, value))
time.sleep(value)
def micro_jitter(self):
"""极短延迟0.1-0.5 秒),用于连续操作之间。"""
time.sleep(random.uniform(0.1, 0.5))
def scan_interval_with_jitter(self) -> float:
"""返回带 ±20% 抖动的扫描间隔时间(秒)。"""
base = self.config.scan_interval
jitter = base * 0.2
return random.uniform(base - jitter, base + jitter)
# ----------------------------------------------------------------
# 长休息
# ----------------------------------------------------------------
def should_take_break(self) -> bool:
"""判断是否应该触发一次长休息。
每个循环有固定概率触发模拟真人偶尔离开电脑
"""
self._cycle_count += 1
return random.random() < self._break_probability
def long_break(self):
"""执行一次长休息30-120 秒)。"""
duration = random.uniform(self._break_min, self._break_max)
logger.info(f"模拟长休息: {duration:.0f}s")
time.sleep(duration)
# ----------------------------------------------------------------
# 工作时间
# ----------------------------------------------------------------
def is_off_hours(self) -> bool:
"""检查是否在非工作时间(应暂停操作)。"""
return not self.config.is_within_working_hours()
# ----------------------------------------------------------------
# 随机子集
# ----------------------------------------------------------------
def random_subset_count(self, total: int, max_count: int) -> int:
"""决定本次处理多少个项目。
不总是处理 max_count 偶尔少处理一些模拟真人不会每次都处理所有内容
"""
if total <= 0:
return 0
upper = min(total, max_count)
# 80% 概率处理全部上限内20% 概率减少 1-2 个
if random.random() < 0.8:
return upper
return max(1, upper - random.randint(1, 2))

View File

@ -0,0 +1,40 @@
"""日志配置"""
import logging
from logging.handlers import RotatingFileHandler
def setup_logging(config) -> logging.Logger:
"""配置日志系统,返回 logger 实例。"""
logger = logging.getLogger("wechat_clicker")
logger.setLevel(getattr(logging, config.log_level, logging.INFO))
# 避免重复添加 handler
if logger.handlers:
return logger
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# 文件 handler自动轮转
file_handler = RotatingFileHandler(
config.log_file,
maxBytes=config.log_max_bytes,
backupCount=config.log_backup_count,
encoding="utf-8",
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 控制台 handler
if config.log_console:
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
))
logger.addHandler(console_handler)
return logger

View File

@ -0,0 +1,201 @@
"""UI 状态机
基于微信窗口数量和名称判断当前 UI 状态提供状态恢复能力
微信 v4.1.9 窗口模型
- 1 个窗口"微信"主聊天列表
- 2 个窗口"微信" + 聊天名会话已打开
- 3+ 个窗口可能有图片预览等额外窗口
"""
import logging
import time
from enum import Enum
from .ax_bridge import AXBridge
from .wechat_ui import WeChatUI
logger = logging.getLogger("wechat_clicker.state_machine")
class UIState(Enum):
UNKNOWN = "unknown"
MAIN_CHAT_LIST = "main_chat_list"
CONVERSATION_OPEN = "conversation_open"
MEDIA_PREVIEW = "media_preview"
WECHAT_NOT_RUNNING = "wechat_not_running"
class StateMachine:
"""微信 UI 状态机"""
def __init__(self, ax: AXBridge, wechat_ui: WeChatUI):
self.ax = ax
self.ui = wechat_ui
self._current_state = UIState.UNKNOWN
self._conversation_name = None # 当前打开的会话名称
@property
def current_state(self) -> UIState:
return self._current_state
@property
def conversation_name(self) -> str:
return self._conversation_name or ""
# ----------------------------------------------------------------
# 状态检测
# ----------------------------------------------------------------
def detect_state(self) -> UIState:
"""检测当前微信 UI 状态。"""
windows = self.ui.get_all_windows()
if not windows:
self._current_state = UIState.WECHAT_NOT_RUNNING
self._conversation_name = None
logger.debug("未检测到微信窗口")
return self._current_state
# 分析窗口
main_window = None
other_windows = []
for win in windows:
title = self.ax.get_title(win)
if title == "微信":
main_window = win
elif title:
other_windows.append((win, title))
if main_window is None:
# 微信可能最小化了,或者窗口结构变化
self._current_state = UIState.UNKNOWN
logger.debug("未找到微信主窗口")
return self._current_state
window_count = len(windows)
if window_count == 1:
# 只有主窗口 = 聊天列表
self._current_state = UIState.MAIN_CHAT_LIST
self._conversation_name = None
elif window_count == 2 and len(other_windows) == 1:
# 主窗口 + 一个会话窗口
self._current_state = UIState.CONVERSATION_OPEN
self._conversation_name = other_windows[0][1]
elif window_count >= 3:
# 可能有预览窗口
self._current_state = UIState.MEDIA_PREVIEW
# 第一个非主窗口通常是会话
if other_windows:
self._conversation_name = other_windows[0][1]
else:
self._current_state = UIState.UNKNOWN
logger.debug(
f"状态检测: {self._current_state.value}, "
f"窗口数={window_count}, 会话={self._conversation_name}"
)
return self._current_state
# ----------------------------------------------------------------
# 状态恢复
# ----------------------------------------------------------------
def recover_to_chat_list(self, max_retries: int = 3) -> bool:
"""恢复到主聊天列表状态。关闭所有会话和预览窗口。
Returns:
True 如果成功恢复到聊天列表
"""
for attempt in range(max_retries):
state = self.detect_state()
if state == UIState.MAIN_CHAT_LIST:
return True
if state == UIState.WECHAT_NOT_RUNNING:
logger.error("微信未运行,无法恢复")
return False
if state == UIState.MEDIA_PREVIEW:
# 先关闭预览Escape
logger.info("关闭媒体预览...")
self.ax.send_escape_key()
time.sleep(0.5)
if state in (UIState.CONVERSATION_OPEN, UIState.MEDIA_PREVIEW):
# 关闭会话窗口
self._close_all_conversations()
time.sleep(0.5)
if state == UIState.UNKNOWN:
# 尝试激活微信并点击侧边栏聊天按钮
self.ui.ensure_wechat_frontmost()
time.sleep(0.5)
self.ui.click_sidebar_chat_tab()
time.sleep(0.5)
logger.debug(f"恢复尝试 {attempt + 1}/{max_retries}")
# 最终检查
final_state = self.detect_state()
if final_state == UIState.MAIN_CHAT_LIST:
return True
logger.error(f"恢复失败,当前状态: {final_state.value}")
return False
def _close_all_conversations(self):
"""关闭所有会话窗口。"""
conv_windows = self.ui.get_conversation_windows()
for win in conv_windows:
title = self.ax.get_title(win)
close_btn = self.ui.get_close_button(win)
if close_btn:
logger.debug(f"关闭会话窗口: {title}")
self.ax.press(close_btn)
time.sleep(0.3)
else:
# 后备方案Cmd+W
logger.debug(f"使用 Cmd+W 关闭窗口: {title}")
self.ax.send_cmd_w()
time.sleep(0.3)
def close_current_conversation(self) -> bool:
"""关闭当前打开的会话窗口。"""
conv_window = self.ui.get_conversation_window()
if conv_window is None:
return True # 没有会话窗口,视为成功
close_btn = self.ui.get_close_button(conv_window)
if close_btn:
self.ax.press(close_btn)
else:
self.ax.send_cmd_w()
time.sleep(0.5)
# 验证
state = self.detect_state()
return state == UIState.MAIN_CHAT_LIST
def dismiss_preview(self) -> bool:
"""关闭媒体预览。"""
state = self.detect_state()
if state != UIState.MEDIA_PREVIEW:
return True # 没有预览打开
# 发送 Escape 关闭预览
self.ax.send_escape_key()
time.sleep(0.5)
# 验证
state = self.detect_state()
if state == UIState.MEDIA_PREVIEW:
# 再试一次
self.ax.send_escape_key()
time.sleep(0.5)
state = self.detect_state()
return state != UIState.MEDIA_PREVIEW

394
wechat_clicker/wechat_ui.py Normal file
View File

@ -0,0 +1,394 @@
"""微信桌面端 UI 导航与元素查找
基于 macOS Accessibility API 探测到的微信 v4.1.9 UI 结构
- 主窗口 "微信" 包含侧边栏和聊天列表
- 点击聊天项会打开独立的会话窗口以聊天名命名
- 聊天列表AXList name="会话"
- 消息列表AXList name="消息"
- 每个聊天项的 title 是多行文本包含聊天名未读数最近消息时间
"""
import logging
import re
from .ax_bridge import AXBridge
logger = logging.getLogger("wechat_clicker.wechat_ui")
class ChatItem:
"""解析后的聊天列表项"""
def __init__(self, element, name: str, unread_count: int, preview: str, timestamp: str):
self.element = element
self.name = name
self.unread_count = unread_count
self.preview = preview
self.timestamp = timestamp
def __repr__(self):
return f"ChatItem(name={self.name!r}, unread={self.unread_count})"
class MessageItem:
"""解析后的消息"""
def __init__(self, element, msg_type: str, title: str, size: tuple):
self.element = element
self.msg_type = msg_type # "image", "file", "video", "text", "timestamp", "other"
self.title = title
self.size = size
@property
def is_visible(self) -> bool:
"""判断消息是否在可见区域(不可见的消息 size 为 0x0"""
return self.size[0] > 0 and self.size[1] > 0
def __repr__(self):
short = self.title.replace("\n", "\\n")[:40]
return f"MessageItem(type={self.msg_type}, title={short!r})"
class WeChatUI:
"""微信 UI 导航器"""
def __init__(self, ax: AXBridge, bundle_id: str = "com.tencent.xinWeChat"):
self.ax = ax
self.bundle_id = bundle_id
self._app_ref = None
# ----------------------------------------------------------------
# 应用引用管理
# ----------------------------------------------------------------
def get_app_ref(self):
"""获取或缓存微信应用的 AXUIElement 引用。"""
if self._app_ref is None:
self._app_ref = self.ax.get_app_ref(self.bundle_id)
return self._app_ref
def invalidate_app_ref(self):
"""清除缓存的应用引用(微信重启后需要)。"""
self._app_ref = None
def ensure_wechat_frontmost(self) -> bool:
"""确保微信在最前台。"""
return self.ax.bring_to_front(self.bundle_id)
# ----------------------------------------------------------------
# 窗口查找
# ----------------------------------------------------------------
def get_all_windows(self) -> list:
"""获取微信所有窗口。"""
app_ref = self.get_app_ref()
if app_ref is None:
return []
return self.ax.get_windows(app_ref)
def get_main_window(self):
"""找到主窗口(名为"微信")。"""
for win in self.get_all_windows():
title = self.ax.get_title(win)
if title == "微信":
return win
logger.warning("未找到微信主窗口")
return None
def get_conversation_windows(self) -> list:
"""获取所有非主窗口(即会话窗口)。"""
result = []
for win in self.get_all_windows():
title = self.ax.get_title(win)
# 排除主窗口和一些系统窗口
if title and title != "微信":
result.append(win)
return result
def get_conversation_window(self):
"""获取第一个会话窗口(如果存在)。"""
conv_windows = self.get_conversation_windows()
return conv_windows[0] if conv_windows else None
# ----------------------------------------------------------------
# 元素查找工具
# ----------------------------------------------------------------
def _find_child_by_role_and_name(self, parent, role: str, name: str = None, desc: str = None):
"""在子元素中按 role + name/desc 查找。"""
children = self.ax.get_children(parent)
for child in children:
child_role = self.ax.get_role(child)
if child_role != role:
continue
if name is not None:
child_title = self.ax.get_title(child)
if child_title == name:
return child
# 也检查 AXDescription
child_desc = self.ax.get_description(child)
if child_desc == name:
return child
elif desc is not None:
child_desc = self.ax.get_description(child)
if child_desc == desc:
return child
else:
# 只匹配 role
return child
return None
def _find_child_recursive(self, parent, role: str, name: str = None, max_depth: int = 6):
"""递归查找子元素(广度优先)。"""
if max_depth <= 0:
return None
children = self.ax.get_children(parent)
# 先在当前层查找
for child in children:
child_role = self.ax.get_role(child)
if child_role == role:
if name is None:
return child
child_title = self.ax.get_title(child)
if child_title == name:
return child
# 也用 AXDescription 匹配
child_desc = self.ax.get_description(child)
if child_desc == name:
return child
# 递归到下一层
for child in children:
result = self._find_child_recursive(child, role, name, max_depth - 1)
if result is not None:
return result
return None
# ----------------------------------------------------------------
# 聊天列表
# ----------------------------------------------------------------
def get_chat_list(self):
"""找到聊天列表元素AXList name="会话")。"""
main_win = self.get_main_window()
if main_win is None:
return None
return self._find_child_recursive(main_win, "AXList", "会话")
def get_sidebar_chat_button(self):
"""找到侧边栏的"微信"按钮(显示全局未读数)。"""
main_win = self.get_main_window()
if main_win is None:
return None
return self._find_child_recursive(main_win, "AXButton", "微信")
def get_global_unread_count(self) -> int:
"""从侧边栏"微信"按钮获取全局未读消息数。"""
btn = self.get_sidebar_chat_button()
if btn is None:
return 0
desc = self.ax.get_description(btn)
if not desc:
return 0
match = re.search(r"(\d+)条新消息", desc)
if match:
return int(match.group(1))
return 0
def get_chat_items(self) -> list:
"""获取聊天列表中所有可见聊天项,返回 ChatItem 列表。"""
chat_list = self.get_chat_list()
if chat_list is None:
logger.warning("未找到聊天列表")
return []
items = []
children = self.ax.get_children(chat_list)
for child in children:
role = self.ax.get_role(child)
if role != "AXStaticText":
continue
title = self.ax.get_title(child)
if not title:
continue
parsed = self._parse_chat_title(title)
if parsed["name"]:
items.append(ChatItem(
element=child,
name=parsed["name"],
unread_count=parsed["unread_count"],
preview=parsed["preview"],
timestamp=parsed["timestamp"],
))
return items
def get_unread_chats(self) -> list:
"""获取有未读消息的聊天项,按未读数降序排列。"""
items = self.get_chat_items()
unread = [item for item in items if item.unread_count > 0]
unread.sort(key=lambda x: x.unread_count, reverse=True)
return unread
# ----------------------------------------------------------------
# 会话消息
# ----------------------------------------------------------------
def get_message_list(self, conv_window):
"""在会话窗口中找到消息列表AXList name="消息")。"""
if conv_window is None:
return None
return self._find_child_recursive(conv_window, "AXList", "消息")
def get_messages(self, msg_list) -> list:
"""获取消息列表中的所有消息,返回 MessageItem 列表。"""
if msg_list is None:
return []
items = []
children = self.ax.get_children(msg_list)
for child in children:
role = self.ax.get_role(child)
if role != "AXStaticText":
continue
title = self.ax.get_title(child)
if not title:
continue
size = self.ax.get_size(child)
msg_type = self._classify_message(title, size)
items.append(MessageItem(
element=child,
msg_type=msg_type,
title=title,
size=size,
))
return items
def get_media_messages(self, conv_window) -> list:
"""获取会话中所有可见的图片/文件/视频消息。"""
msg_list = self.get_message_list(conv_window)
if msg_list is None:
return []
messages = self.get_messages(msg_list)
media = [
msg for msg in messages
if msg.msg_type in ("image", "file", "video") and msg.is_visible
]
return media
# ----------------------------------------------------------------
# 窗口操作
# ----------------------------------------------------------------
def get_close_button(self, window):
"""找到窗口的关闭按钮AXButton desc="关闭按钮")。"""
if window is None:
return None
children = self.ax.get_children(window)
for child in children:
role = self.ax.get_role(child)
if role == "AXButton":
desc = self.ax.get_description(child)
if desc == "关闭按钮":
return child
return None
def click_sidebar_chat_tab(self) -> bool:
"""点击侧边栏"微信"按钮,确保显示聊天列表。"""
btn = self.get_sidebar_chat_button()
if btn is None:
logger.warning("未找到侧边栏微信按钮")
return False
return self.ax.press(btn)
# ----------------------------------------------------------------
# 解析工具
# ----------------------------------------------------------------
@staticmethod
def _parse_chat_title(title_text: str) -> dict:
"""解析聊天项的多行 title 文本。
格式有未读时:
聊天名
[N条]
最近消息预览
时间
格式无未读时:
聊天名
最近消息预览
时间
"""
lines = title_text.strip().split("\n")
result = {
"name": "",
"unread_count": 0,
"preview": "",
"timestamp": "",
}
if not lines:
return result
result["name"] = lines[0].strip()
if len(lines) >= 2:
# 检查第二行是否是 [N条] 格式
unread_match = re.match(r"\[(\d+)条\]", lines[1].strip())
if unread_match:
result["unread_count"] = int(unread_match.group(1))
if len(lines) >= 3:
result["preview"] = lines[2].strip()
if len(lines) >= 4:
result["timestamp"] = lines[-1].strip()
else:
result["preview"] = lines[1].strip()
if len(lines) >= 3:
result["timestamp"] = lines[-1].strip()
return result
@staticmethod
def _classify_message(title: str, size: tuple) -> str:
"""根据消息 title 和尺寸分类消息类型。
已知模式微信 v4.1.9:
- 图片: title="图片"
- 视频: title "视频" 开头
- 文件: title "文件\\n" 开头
- 时间戳: 高度约 41px内容为时间格式
- 其他: 文本消息或其他类型
"""
stripped = title.strip()
# 图片
if stripped == "图片":
return "image"
# 视频
if stripped.startswith("视频"):
return "video"
# 文件(多行格式:文件\n文件名\n大小\n微信电脑版
if stripped.startswith("文件\n") or stripped.startswith("[文件]"):
return "file"
# 时间戳分隔符(高度约 41px内容为时间格式
if size[1] > 0 and size[1] <= 45:
time_pattern = re.match(
r"^(\d{1,2}:\d{2}|\d{1,2}/\d{1,2}|星期[一二三四五六日]|昨天|前天)$",
stripped,
)
if time_pattern:
return "timestamp"
return "text"