A CLI tool to query local WeChat data with 11 commands: sessions, history, search, contacts, members, stats, export, favorites, unread, new-messages, and init. Features: - Self-contained init with key extraction (no external deps) - On-the-fly SQLCipher decryption with caching - JSON output by default for LLM/AI tool integration - Message type filtering and chat statistics - Markdown/txt export for conversations - Cross-platform: macOS, Windows, Linux
146 lines
4.8 KiB
Python
146 lines
4.8 KiB
Python
"""Windows 密钥提取 — 扫描 Weixin.exe 进程内存"""
|
||
|
||
import ctypes
|
||
import ctypes.wintypes as wt
|
||
import functools
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import time
|
||
|
||
from .common import collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results
|
||
|
||
print = functools.partial(print, flush=True)
|
||
|
||
kernel32 = ctypes.windll.kernel32
|
||
MEM_COMMIT = 0x1000
|
||
READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}
|
||
|
||
|
||
class MBI(ctypes.Structure):
|
||
_fields_ = [
|
||
("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64),
|
||
("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD),
|
||
("RegionSize", ctypes.c_uint64), ("State", wt.DWORD),
|
||
("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD),
|
||
]
|
||
|
||
|
||
def _get_pids():
|
||
"""返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序"""
|
||
r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"],
|
||
capture_output=True, text=True)
|
||
pids = []
|
||
for line in r.stdout.strip().split('\n'):
|
||
if not line.strip():
|
||
continue
|
||
p = line.strip('"').split('","')
|
||
if len(p) >= 5:
|
||
pid = int(p[1])
|
||
mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0')
|
||
pids.append((pid, mem))
|
||
if not pids:
|
||
raise RuntimeError("Weixin.exe 未运行")
|
||
pids.sort(key=lambda x: x[1], reverse=True)
|
||
for pid, mem in pids:
|
||
print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)")
|
||
return pids
|
||
|
||
|
||
def _read_mem(h, addr, sz):
|
||
buf = ctypes.create_string_buffer(sz)
|
||
n = ctypes.c_size_t(0)
|
||
if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)):
|
||
return buf.raw[:n.value]
|
||
return None
|
||
|
||
|
||
def _enum_regions(h):
|
||
regs = []
|
||
addr = 0
|
||
mbi = MBI()
|
||
while addr < 0x7FFFFFFFFFFF:
|
||
if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0:
|
||
break
|
||
if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024:
|
||
regs.append((mbi.BaseAddress, mbi.RegionSize))
|
||
nxt = mbi.BaseAddress + mbi.RegionSize
|
||
if nxt <= addr:
|
||
break
|
||
addr = nxt
|
||
return regs
|
||
|
||
|
||
def extract_keys(db_dir, output_path, pid=None):
|
||
"""提取 Windows 微信数据库密钥。
|
||
|
||
Args:
|
||
db_dir: 微信数据库目录
|
||
output_path: all_keys.json 输出路径
|
||
pid: 可选,指定 PID(默认自动检测所有 Weixin.exe)
|
||
|
||
Returns:
|
||
dict: salt_hex -> enc_key_hex 映射
|
||
"""
|
||
print("=" * 60)
|
||
print(" 提取所有微信数据库密钥")
|
||
print("=" * 60)
|
||
|
||
db_files, salt_to_dbs = collect_db_files(db_dir)
|
||
|
||
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt")
|
||
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
|
||
print(f" salt {salt_hex}: {', '.join(dbs)}")
|
||
|
||
pids = _get_pids() if pid is None else [(pid, 0)]
|
||
|
||
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
|
||
key_map = {}
|
||
remaining_salts = set(salt_to_dbs.keys())
|
||
all_hex_matches = 0
|
||
t0 = time.time()
|
||
|
||
for pid_val, mem_kb in pids:
|
||
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid_val)
|
||
if not h:
|
||
print(f"[WARN] 无法打开进程 PID={pid_val},跳过")
|
||
continue
|
||
|
||
try:
|
||
regions = _enum_regions(h)
|
||
total_bytes = sum(s for _, s in regions)
|
||
total_mb = total_bytes / 1024 / 1024
|
||
print(f"\n[*] 扫描 PID={pid_val} ({total_mb:.0f}MB, {len(regions)} 区域)")
|
||
|
||
scanned_bytes = 0
|
||
for reg_idx, (base, size) in enumerate(regions):
|
||
data = _read_mem(h, base, size)
|
||
scanned_bytes += size
|
||
if not data:
|
||
continue
|
||
|
||
all_hex_matches += scan_memory_for_keys(
|
||
data, hex_re, db_files, salt_to_dbs,
|
||
key_map, remaining_salts, base, pid_val, print,
|
||
)
|
||
|
||
if (reg_idx + 1) % 200 == 0:
|
||
elapsed = time.time() - t0
|
||
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
|
||
print(
|
||
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
|
||
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
|
||
)
|
||
finally:
|
||
kernel32.CloseHandle(h)
|
||
|
||
if not remaining_salts:
|
||
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
|
||
break
|
||
|
||
elapsed = time.time() - t0
|
||
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
|
||
|
||
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
|
||
return save_results(db_files, salt_to_dbs, key_map, output_path, print)
|