wechat_msg_crawler/wechat_cli/keys/scanner_windows.py
canghe e64006bafe Initial release: wechat-cli v0.2.0
A CLI tool to query local WeChat data with 11 commands:
sessions, history, search, contacts, members, stats, export,
favorites, unread, new-messages, and init.

Features:
- Self-contained init with key extraction (no external deps)
- On-the-fly SQLCipher decryption with caching
- JSON output by default for LLM/AI tool integration
- Message type filtering and chat statistics
- Markdown/txt export for conversations
- Cross-platform: macOS, Windows, Linux
2026-04-04 11:10:10 +08:00

146 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Windows 密钥提取 — 扫描 Weixin.exe 进程内存"""
import ctypes
import ctypes.wintypes as wt
import functools
import os
import re
import subprocess
import time
from .common import collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results
print = functools.partial(print, flush=True)
kernel32 = ctypes.windll.kernel32
MEM_COMMIT = 0x1000
READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}
class MBI(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64),
("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD),
("RegionSize", ctypes.c_uint64), ("State", wt.DWORD),
("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD),
]
def _get_pids():
"""返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序"""
r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"],
capture_output=True, text=True)
pids = []
for line in r.stdout.strip().split('\n'):
if not line.strip():
continue
p = line.strip('"').split('","')
if len(p) >= 5:
pid = int(p[1])
mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0')
pids.append((pid, mem))
if not pids:
raise RuntimeError("Weixin.exe 未运行")
pids.sort(key=lambda x: x[1], reverse=True)
for pid, mem in pids:
print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)")
return pids
def _read_mem(h, addr, sz):
buf = ctypes.create_string_buffer(sz)
n = ctypes.c_size_t(0)
if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)):
return buf.raw[:n.value]
return None
def _enum_regions(h):
regs = []
addr = 0
mbi = MBI()
while addr < 0x7FFFFFFFFFFF:
if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0:
break
if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024:
regs.append((mbi.BaseAddress, mbi.RegionSize))
nxt = mbi.BaseAddress + mbi.RegionSize
if nxt <= addr:
break
addr = nxt
return regs
def extract_keys(db_dir, output_path, pid=None):
"""提取 Windows 微信数据库密钥。
Args:
db_dir: 微信数据库目录
output_path: all_keys.json 输出路径
pid: 可选,指定 PID默认自动检测所有 Weixin.exe
Returns:
dict: salt_hex -> enc_key_hex 映射
"""
print("=" * 60)
print(" 提取所有微信数据库密钥")
print("=" * 60)
db_files, salt_to_dbs = collect_db_files(db_dir)
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
pids = _get_pids() if pid is None else [(pid, 0)]
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
key_map = {}
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for pid_val, mem_kb in pids:
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid_val)
if not h:
print(f"[WARN] 无法打开进程 PID={pid_val},跳过")
continue
try:
regions = _enum_regions(h)
total_bytes = sum(s for _, s in regions)
total_mb = total_bytes / 1024 / 1024
print(f"\n[*] 扫描 PID={pid_val} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
for reg_idx, (base, size) in enumerate(regions):
data = _read_mem(h, base, size)
scanned_bytes += size
if not data:
continue
all_hex_matches += scan_memory_for_keys(
data, hex_re, db_files, salt_to_dbs,
key_map, remaining_salts, base, pid_val, print,
)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
)
finally:
kernel32.CloseHandle(h)
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
return save_results(db_files, salt_to_dbs, key_map, output_path, print)