202 lines
5.8 KiB
Python
202 lines
5.8 KiB
Python
"""配置加载与管理"""
|
||
|
||
import os
|
||
import random
|
||
import yaml
|
||
from datetime import datetime
|
||
|
||
|
||
# 默认配置
|
||
DEFAULTS = {
|
||
"wechat": {
|
||
"bundle_id": "com.tencent.xinWeChat",
|
||
"process_name": "WeChat",
|
||
},
|
||
"scan": {
|
||
"interval_seconds": 15,
|
||
"max_chats_per_scan": 0,
|
||
"scroll_chat_list": False,
|
||
},
|
||
"delays": {
|
||
"before_click_chat": [0.5, 2],
|
||
"after_open_chat": [0.5, 1.5],
|
||
"before_click_media": [0.3, 1.5],
|
||
"after_click_media": [1.5, 4],
|
||
"before_close_preview": [0.3, 1],
|
||
"before_close_chat": [0.3, 1],
|
||
"between_messages": [0.2, 0.8],
|
||
},
|
||
"schedule": {
|
||
"enabled": True,
|
||
"start_hour": 8,
|
||
"end_hour": 23,
|
||
"pause_on_weekends": False,
|
||
},
|
||
"filter": {
|
||
"mode": "all",
|
||
"whitelist": [],
|
||
"blacklist": ["腾讯新闻", "微信支付", "微信团队", "服务号", "订阅号", "文件传输助手"],
|
||
},
|
||
"media": {
|
||
"click_images": True,
|
||
"click_files": False,
|
||
"click_videos": False,
|
||
"max_media_per_chat": 20,
|
||
},
|
||
"logging": {
|
||
"level": "INFO",
|
||
"file": "wechat_clicker.log",
|
||
"max_bytes": 10485760,
|
||
"backup_count": 5,
|
||
"console": True,
|
||
},
|
||
}
|
||
|
||
|
||
def _deep_merge(base: dict, override: dict) -> dict:
|
||
"""深度合并两个字典,override 中的值覆盖 base 中的值。"""
|
||
result = base.copy()
|
||
for key, value in override.items():
|
||
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
|
||
result[key] = _deep_merge(result[key], value)
|
||
else:
|
||
result[key] = value
|
||
return result
|
||
|
||
|
||
class Config:
|
||
"""配置管理器"""
|
||
|
||
def __init__(self, config_path: str = "config.yaml"):
|
||
self._config_path = config_path
|
||
self._data = self._load()
|
||
|
||
def _load(self) -> dict:
|
||
"""加载配置文件,与默认值合并。"""
|
||
if os.path.exists(self._config_path):
|
||
with open(self._config_path, "r", encoding="utf-8") as f:
|
||
user_config = yaml.safe_load(f) or {}
|
||
return _deep_merge(DEFAULTS, user_config)
|
||
return DEFAULTS.copy()
|
||
|
||
def _get(self, dotted_key: str, default=None):
|
||
"""通过点号分隔的 key 获取配置值。例如 'scan.interval_seconds'"""
|
||
keys = dotted_key.split(".")
|
||
value = self._data
|
||
for key in keys:
|
||
if isinstance(value, dict) and key in value:
|
||
value = value[key]
|
||
else:
|
||
return default
|
||
return value
|
||
|
||
def override(self, dotted_key: str, value):
|
||
"""运行时覆盖某个配置值。"""
|
||
keys = dotted_key.split(".")
|
||
target = self._data
|
||
for key in keys[:-1]:
|
||
target = target.setdefault(key, {})
|
||
target[keys[-1]] = value
|
||
|
||
# --- 便捷属性 ---
|
||
|
||
@property
|
||
def bundle_id(self) -> str:
|
||
return self._get("wechat.bundle_id")
|
||
|
||
@property
|
||
def process_name(self) -> str:
|
||
return self._get("wechat.process_name")
|
||
|
||
@property
|
||
def scan_interval(self) -> int:
|
||
return self._get("scan.interval_seconds")
|
||
|
||
@property
|
||
def max_chats_per_scan(self) -> int:
|
||
return self._get("scan.max_chats_per_scan")
|
||
|
||
@property
|
||
def scroll_chat_list(self) -> bool:
|
||
return self._get("scan.scroll_chat_list")
|
||
|
||
@property
|
||
def click_images(self) -> bool:
|
||
return self._get("media.click_images")
|
||
|
||
@property
|
||
def click_files(self) -> bool:
|
||
return self._get("media.click_files")
|
||
|
||
@property
|
||
def click_videos(self) -> bool:
|
||
return self._get("media.click_videos")
|
||
|
||
@property
|
||
def max_media_per_chat(self) -> int:
|
||
return self._get("media.max_media_per_chat")
|
||
|
||
@property
|
||
def log_level(self) -> str:
|
||
return self._get("logging.level")
|
||
|
||
@property
|
||
def log_file(self) -> str:
|
||
return self._get("logging.file")
|
||
|
||
@property
|
||
def log_max_bytes(self) -> int:
|
||
return self._get("logging.max_bytes")
|
||
|
||
@property
|
||
def log_backup_count(self) -> int:
|
||
return self._get("logging.backup_count")
|
||
|
||
@property
|
||
def log_console(self) -> bool:
|
||
return self._get("logging.console")
|
||
|
||
# --- 延迟 ---
|
||
|
||
def get_delay(self, delay_name: str) -> float:
|
||
"""获取指定名称的随机延迟时间(秒),使用截断正态分布。"""
|
||
delay_range = self._get(f"delays.{delay_name}", [1, 3])
|
||
min_val, max_val = delay_range[0], delay_range[1]
|
||
mid = (min_val + max_val) / 2
|
||
std = (max_val - min_val) / 4 # 大部分值落在范围内
|
||
value = random.gauss(mid, std)
|
||
return max(min_val, min(max_val, value))
|
||
|
||
# --- 工作时间 ---
|
||
|
||
def is_within_working_hours(self) -> bool:
|
||
"""检查当前时间是否在配置的工作时间内。"""
|
||
if not self._get("schedule.enabled"):
|
||
return True
|
||
|
||
now = datetime.now()
|
||
# 周末检查
|
||
if self._get("schedule.pause_on_weekends") and now.weekday() >= 5:
|
||
return False
|
||
|
||
start_hour = self._get("schedule.start_hour")
|
||
end_hour = self._get("schedule.end_hour")
|
||
return start_hour <= now.hour < end_hour
|
||
|
||
# --- 聊天过滤 ---
|
||
|
||
def should_process_chat(self, chat_name: str) -> bool:
|
||
"""根据白名单/黑名单判断是否处理此聊天。"""
|
||
mode = self._get("filter.mode", "all")
|
||
if mode == "all":
|
||
# 即使在 all 模式下也检查黑名单
|
||
blacklist = self._get("filter.blacklist", [])
|
||
return chat_name not in blacklist
|
||
elif mode == "whitelist":
|
||
whitelist = self._get("filter.whitelist", [])
|
||
return chat_name in whitelist
|
||
elif mode == "blacklist":
|
||
blacklist = self._get("filter.blacklist", [])
|
||
return chat_name not in blacklist
|
||
return True
|