wechat_msg_crawler/wechat_cli/core/messages.py
canghe e64006bafe Initial release: wechat-cli v0.2.0
A CLI tool to query local WeChat data with 11 commands:
sessions, history, search, contacts, members, stats, export,
favorites, unread, new-messages, and init.

Features:
- Self-contained init with key extraction (no external deps)
- On-the-fly SQLCipher decryption with caching
- JSON output by default for LLM/AI tool integration
- Message type filtering and chat statistics
- Markdown/txt export for conversations
- Cross-platform: macOS, Windows, Linux
2026-04-04 11:10:10 +08:00

666 lines
24 KiB
Python

"""消息查询 — 分表查找、分页、格式化"""
import hashlib
import os
import re
import sqlite3
import xml.etree.ElementTree as ET
from contextlib import closing
from datetime import datetime
import zstandard as zstd
from .key_utils import key_path_variants
_zstd_dctx = zstd.ZstdDecompressor()
_XML_UNSAFE_RE = re.compile(r'<!DOCTYPE|<!ENTITY', re.IGNORECASE)
_XML_PARSE_MAX_LEN = 20000
_QUERY_LIMIT_MAX = 500
_HISTORY_QUERY_BATCH_SIZE = 500
# 消息类型过滤映射: 名称 -> (base_type,) 或 (base_type, sub_type)
MSG_TYPE_FILTERS = {
'text': (1,),
'image': (3,),
'voice': (34,),
'video': (43,),
'sticker': (47,),
'location': (48,),
'link': (49,),
'file': (49, 6),
'call': (50,),
'system': (10000,),
}
MSG_TYPE_NAMES = list(MSG_TYPE_FILTERS.keys())
# ---- 消息 DB 发现 ----
def find_msg_db_keys(all_keys):
return sorted([
k for k in all_keys
if any(v.startswith("message/") for v in key_path_variants(k))
and any(re.search(r"message_\d+\.db$", v) for v in key_path_variants(k))
])
def _is_safe_msg_table_name(table_name):
return bool(re.fullmatch(r'Msg_[0-9a-f]{32}', table_name))
def _find_msg_tables_for_user(username, msg_db_keys, cache):
table_hash = hashlib.md5(username.encode()).hexdigest()
table_name = f"Msg_{table_hash}"
if not _is_safe_msg_table_name(table_name):
return []
matches = []
for rel_key in msg_db_keys:
path = cache.get(rel_key)
if not path:
continue
conn = sqlite3.connect(path)
try:
exists = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
).fetchone()
if not exists:
continue
max_ct = conn.execute(f"SELECT MAX(create_time) FROM [{table_name}]").fetchone()[0] or 0
matches.append({'db_path': path, 'table_name': table_name, 'max_create_time': max_ct})
except Exception:
pass
finally:
conn.close()
matches.sort(key=lambda x: x['max_create_time'], reverse=True)
return matches
# ---- 消息类型 ----
def _split_msg_type(t):
try:
t = int(t)
except (TypeError, ValueError):
return 0, 0
if t > 0xFFFFFFFF:
return t & 0xFFFFFFFF, t >> 32
return t, 0
def format_msg_type(t):
base_type, _ = _split_msg_type(t)
return {
1: '文本', 3: '图片', 34: '语音', 42: '名片',
43: '视频', 47: '表情', 48: '位置', 49: '链接/文件',
50: '通话', 10000: '系统', 10002: '撤回',
}.get(base_type, f'type={t}')
# ---- 内容解压 ----
def decompress_content(content, ct):
if ct and ct == 4 and isinstance(content, bytes):
try:
return _zstd_dctx.decompress(content).decode('utf-8', errors='replace')
except Exception:
return None
if isinstance(content, bytes):
try:
return content.decode('utf-8', errors='replace')
except Exception:
return None
return content
# ---- 内容解析 ----
def _parse_message_content(content, local_type, is_group):
if content is None:
return '', ''
if isinstance(content, bytes):
return '', '(二进制内容)'
sender = ''
text = content
if is_group and ':\n' in content:
sender, text = content.split(':\n', 1)
return sender, text
def _collapse_text(text):
if not text:
return ''
return re.sub(r'\s+', ' ', text).strip()
def _parse_xml_root(content):
if not content or len(content) > _XML_PARSE_MAX_LEN or _XML_UNSAFE_RE.search(content):
return None
try:
return ET.fromstring(content)
except ET.ParseError:
return None
def _parse_int(value, fallback=0):
try:
return int(value)
except (TypeError, ValueError):
return fallback
def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names, _display_name_fn):
if not content or '<appmsg' not in content:
return None
_, sub_type = _split_msg_type(local_type)
root = _parse_xml_root(content)
if root is None:
return None
appmsg = root.find('.//appmsg')
if appmsg is None:
return None
title = _collapse_text(appmsg.findtext('title') or '')
app_type = _parse_int((appmsg.findtext('type') or '').strip(), _parse_int(sub_type, 0))
if app_type == 57:
ref = appmsg.find('.//refermsg')
ref_content = ''
ref_display_name = ''
if ref is not None:
ref_display_name = (ref.findtext('displayname') or '').strip()
ref_content = _collapse_text(ref.findtext('content') or '')
if len(ref_content) > 160:
ref_content = ref_content[:160] + "..."
quote_text = title or "[引用消息]"
if ref_content:
prefix = f"回复 {ref_display_name}: " if ref_display_name else "回复: "
quote_text += f"\n{prefix}{ref_content}"
return quote_text
if app_type == 6:
return f"[文件] {title}" if title else "[文件]"
if app_type == 5:
return f"[链接] {title}" if title else "[链接]"
if app_type in (33, 36, 44):
return f"[小程序] {title}" if title else "[小程序]"
if title:
return f"[链接/文件] {title}"
return "[链接/文件]"
def _format_voip_message_text(content):
if not content or '<voip' not in content:
return None
root = _parse_xml_root(content)
if root is None:
return "[通话]"
raw_text = _collapse_text(root.findtext('.//msg') or '')
if not raw_text:
return "[通话]"
status_map = {
'Canceled': '已取消', 'Line busy': '对方忙线',
'Call not answered': '未接听', "Call wasn't answered": '未接听',
}
if raw_text.startswith('Duration:'):
duration = raw_text.split(':', 1)[1].strip()
return f"[通话] 通话时长 {duration}" if duration else "[通话]"
return f"[通话] {status_map.get(raw_text, raw_text)}"
def _format_message_text(local_id, local_type, content, is_group, chat_username, chat_display_name, names, display_name_fn):
sender, text = _parse_message_content(content, local_type, is_group)
base_type, _ = _split_msg_type(local_type)
if base_type == 3:
text = f"[图片] (local_id={local_id})"
elif base_type == 47:
text = "[表情]"
elif base_type == 50:
text = _format_voip_message_text(text) or "[通话]"
elif base_type == 49:
text = _format_app_message_text(
text, local_type, is_group, chat_username, chat_display_name, names, display_name_fn
) or "[链接/文件]"
elif base_type != 1:
type_label = format_msg_type(local_type)
text = f"[{type_label}] {text}" if text else f"[{type_label}]"
return sender, text
# ---- Name2Id ----
def _load_name2id_maps(conn):
id_to_username = {}
try:
rows = conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall()
except sqlite3.Error:
return id_to_username
for rowid, user_name in rows:
if not user_name:
continue
id_to_username[rowid] = user_name
return id_to_username
# ---- 发送者解析 ----
def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username, display_name_fn):
sender_username = id_to_username.get(real_sender_id, '')
if is_group:
if sender_username and sender_username != chat_username:
return display_name_fn(sender_username, names)
if sender_from_content:
return display_name_fn(sender_from_content, names)
return ''
if sender_username == chat_username:
return chat_display_name
if sender_username:
return display_name_fn(sender_username, names)
return ''
# ---- SQL 查询 ----
def _build_message_filters(start_ts=None, end_ts=None, keyword='', msg_type_filter=None):
clauses = []
params = []
if start_ts is not None:
clauses.append('create_time >= ?')
params.append(start_ts)
if end_ts is not None:
clauses.append('create_time <= ?')
params.append(end_ts)
if keyword:
clauses.append('message_content LIKE ?')
params.append(f'%{keyword}%')
if msg_type_filter is not None:
base_type = msg_type_filter[0]
clauses.append('(local_type & 0xFFFFFFFF) = ?')
params.append(base_type)
if len(msg_type_filter) > 1:
clauses.append('((local_type >> 32) & 0xFFFFFFFF) = ?')
params.append(msg_type_filter[1])
return clauses, params
def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0, msg_type_filter=None):
if not _is_safe_msg_table_name(table_name):
raise ValueError(f'非法消息表名: {table_name}')
clauses, params = _build_message_filters(start_ts, end_ts, keyword, msg_type_filter)
where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else ''
sql = f"""
SELECT local_id, local_type, create_time, real_sender_id, message_content,
WCDB_CT_message_content
FROM [{table_name}]
{where_sql}
ORDER BY create_time DESC
"""
if limit is None:
return conn.execute(sql, params).fetchall()
sql += "\n LIMIT ? OFFSET ?"
return conn.execute(sql, (*params, limit, offset)).fetchall()
# ---- 时间解析 ----
def parse_time_value(value, field_name, is_end=False):
value = (value or '').strip()
if not value:
return None
formats = [
('%Y-%m-%d %H:%M:%S', False),
('%Y-%m-%d %H:%M', False),
('%Y-%m-%d', True),
]
for fmt, date_only in formats:
try:
dt = datetime.strptime(value, fmt)
if date_only and is_end:
dt = dt.replace(hour=23, minute=59, second=59)
return int(dt.timestamp())
except ValueError:
continue
raise ValueError(f"{field_name} 格式无效: {value}。支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS")
def parse_time_range(start_time='', end_time=''):
start_ts = parse_time_value(start_time, 'start_time', is_end=False)
end_ts = parse_time_value(end_time, 'end_time', is_end=True)
if start_ts is not None and end_ts is not None and start_ts > end_ts:
raise ValueError('start_time 不能晚于 end_time')
return start_ts, end_ts
def validate_pagination(limit, offset=0, limit_max=_QUERY_LIMIT_MAX):
if limit <= 0:
raise ValueError("limit 必须大于 0")
if limit_max is not None and limit > limit_max:
raise ValueError(f"limit 不能大于 {limit_max}")
if offset < 0:
raise ValueError("offset 不能小于 0")
# ---- 聊天上下文 ----
def resolve_chat_context(chat_name, msg_db_keys, cache, decrypted_dir):
from .contacts import resolve_username, get_contact_names
username = resolve_username(chat_name, cache, decrypted_dir)
if not username:
return None
names = get_contact_names(cache, decrypted_dir)
display_name = names.get(username, username)
message_tables = _find_msg_tables_for_user(username, msg_db_keys, cache)
if not message_tables:
return {
'query': chat_name, 'username': username, 'display_name': display_name,
'db_path': None, 'table_name': None, 'message_tables': [],
'is_group': '@chatroom' in username,
}
primary = message_tables[0]
return {
'query': chat_name, 'username': username, 'display_name': display_name,
'db_path': primary['db_path'], 'table_name': primary['table_name'],
'message_tables': message_tables, 'is_group': '@chatroom' in username,
}
def _iter_table_contexts(ctx):
tables = ctx.get('message_tables') or []
if not tables and ctx.get('db_path') and ctx.get('table_name'):
tables = [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}]
for table in tables:
yield {
'query': ctx['query'], 'username': ctx['username'], 'display_name': ctx['display_name'],
'db_path': table['db_path'], 'table_name': table['table_name'],
'is_group': ctx['is_group'],
}
def _candidate_page_size(limit, offset):
return limit + offset
def _page_ranked_entries(entries, limit, offset):
ordered = sorted(entries, key=lambda item: item[0], reverse=True)
paged = ordered[offset:offset + limit]
paged.sort(key=lambda item: item[0])
return paged
# ---- 构建行 ----
def _build_history_line(row, ctx, names, id_to_username, display_name_fn):
local_id, local_type, create_time, real_sender_id, content, ct = row
time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M')
content = decompress_content(content, ct)
if content is None:
content = '(无法解压)'
sender, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn
)
sender_label = _resolve_sender_label(
real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn
)
if sender_label:
return create_time, f'[{time_str}] {sender_label}: {text}'
return create_time, f'[{time_str}] {text}'
def _build_search_entry(row, ctx, names, id_to_username, display_name_fn):
local_id, local_type, create_time, real_sender_id, content, ct = row
content = decompress_content(content, ct)
if content is None:
return None
sender, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn
)
if text and len(text) > 300:
text = text[:300] + '...'
sender_label = _resolve_sender_label(
real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn
)
time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M')
entry = f"[{time_str}] [{ctx['display_name']}]"
if sender_label:
entry += f" {sender_label}:"
entry += f" {text}"
return create_time, entry
# ---- 聊天记录查询 ----
def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None):
collected = []
failures = []
candidate_limit = _candidate_page_size(limit, offset)
batch_size = min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE)
for table_ctx in _iter_table_contexts(ctx):
try:
with closing(sqlite3.connect(table_ctx['db_path'])) as conn:
id_to_username = _load_name2id_maps(conn)
fetch_offset = 0
before = len(collected)
while len(collected) - before < candidate_limit:
rows = _query_messages(conn, table_ctx['table_name'], start_ts=start_ts, end_ts=end_ts, limit=batch_size, offset=fetch_offset, msg_type_filter=msg_type_filter)
if not rows:
break
fetch_offset += len(rows)
for row in rows:
try:
collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn))
except Exception as e:
failures.append(f"local_id={row[0]}: {e}")
if len(collected) - before >= candidate_limit:
break
if len(rows) < batch_size:
break
except Exception as e:
failures.append(f"{table_ctx['db_path']}: {e}")
paged = _page_ranked_entries(collected, limit, offset)
return [line for _, line in paged], failures
# ---- 搜索查询 ----
def _collect_search_entries(conn, contexts, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None):
collected = []
failures = []
id_to_username = _load_name2id_maps(conn)
batch_size = candidate_limit
for ctx in contexts:
try:
fetch_offset = 0
before = len(collected)
while len(collected) - before < candidate_limit:
rows = _query_messages(conn, ctx['table_name'], start_ts=start_ts, end_ts=end_ts, keyword=keyword, limit=batch_size, offset=fetch_offset, msg_type_filter=msg_type_filter)
if not rows:
break
fetch_offset += len(rows)
for row in rows:
formatted = _build_search_entry(row, ctx, names, id_to_username, display_name_fn)
if formatted:
collected.append(formatted)
if len(collected) - before >= candidate_limit:
break
if len(rows) < batch_size:
break
except Exception as e:
failures.append(f"{ctx['display_name']}: {e}")
return collected, failures
def collect_chat_search(ctx, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None):
collected = []
failures = []
contexts_by_db = {}
for table_ctx in _iter_table_contexts(ctx):
contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx)
for db_path, db_contexts in contexts_by_db.items():
try:
with closing(sqlite3.connect(db_path)) as conn:
db_entries, db_failures = _collect_search_entries(
conn, db_contexts, names, keyword, display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=msg_type_filter,
)
collected.extend(db_entries)
failures.extend(db_failures)
except Exception as e:
failures.extend(f"{tc['display_name']}: {e}" for tc in db_contexts)
return collected, failures
def search_all_messages(msg_db_keys, cache, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None):
collected = []
failures = []
for rel_key in msg_db_keys:
path = cache.get(rel_key)
if not path:
continue
try:
with closing(sqlite3.connect(path)) as conn:
contexts = _load_search_contexts_from_db(conn, path, names)
db_entries, db_failures = _collect_search_entries(
conn, contexts, names, keyword, display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=msg_type_filter,
)
collected.extend(db_entries)
failures.extend(db_failures)
except Exception as e:
failures.append(f"{rel_key}: {e}")
return collected, failures
def _load_search_contexts_from_db(conn, db_path, names):
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'"
).fetchall()
table_to_username = {}
try:
for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall():
if not user_name:
continue
table_hash = hashlib.md5(user_name.encode()).hexdigest()
table_to_username[f"Msg_{table_hash}"] = user_name
except sqlite3.Error:
pass
contexts = []
for (table_name,) in tables:
username = table_to_username.get(table_name, '')
display_name = names.get(username, username) if username else table_name
contexts.append({
'query': display_name, 'username': username, 'display_name': display_name,
'db_path': db_path, 'table_name': table_name, 'is_group': '@chatroom' in username,
})
return contexts
# ---- 多聊天上下文解析 ----
def resolve_chat_contexts(chat_names, msg_db_keys, cache, decrypted_dir):
resolved = []
unresolved = []
missing_tables = []
seen = set()
for chat_name in chat_names:
name = (chat_name or '').strip()
if not name:
unresolved.append('(空)')
continue
ctx = resolve_chat_context(name, msg_db_keys, cache, decrypted_dir)
if not ctx:
unresolved.append(name)
continue
if not ctx['message_tables']:
missing_tables.append(ctx['display_name'])
continue
if ctx['username'] in seen:
continue
seen.add(ctx['username'])
resolved.append(ctx)
return resolved, unresolved, missing_tables
# ---- 聊天统计 ----
def collect_chat_stats(ctx, names, display_name_fn, start_ts=None, end_ts=None):
"""聚合统计指定聊天的消息数据。
返回: {
total, type_breakdown: {type_name: count},
top_senders: [{name, count}],
hourly: {0:N, ..., 23:N}
}
"""
type_map = {
1: '文本', 3: '图片', 34: '语音', 42: '名片',
43: '视频', 47: '表情', 48: '位置', 49: '链接/文件',
50: '通话', 10000: '系统', 10002: '撤回',
}
total = 0
type_counts = {}
sender_counts = {}
hourly_counts = {}
for table_ctx in _iter_table_contexts(ctx):
try:
with closing(sqlite3.connect(table_ctx['db_path'])) as conn:
id_to_username = _load_name2id_maps(conn)
tbl = table_ctx['table_name']
if not _is_safe_msg_table_name(tbl):
continue
where_parts = []
params = []
if start_ts is not None:
where_parts.append('create_time >= ?')
params.append(start_ts)
if end_ts is not None:
where_parts.append('create_time <= ?')
params.append(end_ts)
where_sql = f"WHERE {' AND '.join(where_parts)}" if where_parts else ''
# 总数 + 类型分布
for bt, cnt in conn.execute(
f"SELECT (local_type & 0xFFFFFFFF), COUNT(*) FROM [{tbl}] {where_sql} GROUP BY (local_type & 0xFFFFFFFF)",
params
).fetchall():
label = type_map.get(bt, f'type={bt}')
type_counts[label] = type_counts.get(label, 0) + cnt
total += cnt
# 发送者排名
for sid, cnt in conn.execute(
f"SELECT real_sender_id, COUNT(*) FROM [{tbl}] {where_sql} GROUP BY real_sender_id ORDER BY COUNT(*) DESC LIMIT 20",
params
).fetchall():
uname = id_to_username.get(sid, str(sid))
if uname:
sender_counts[uname] = sender_counts.get(uname, 0) + cnt
# 24小时分布
for h, cnt in conn.execute(
f"SELECT cast(strftime('%H', create_time, 'unixepoch', 'localtime') as integer), COUNT(*) FROM [{tbl}] {where_sql} GROUP BY cast(strftime('%H', create_time, 'unixepoch', 'localtime') as integer)",
params
).fetchall():
if h is not None:
hourly_counts[h] = hourly_counts.get(h, 0) + cnt
except Exception:
pass
top_senders = sorted(sender_counts.items(), key=lambda x: x[1], reverse=True)[:10]
top_senders = [{'name': display_name_fn(u, names), 'count': c} for u, c in top_senders]
hourly = {h: hourly_counts.get(h, 0) for h in range(24)}
return {
'total': total,
'type_breakdown': dict(sorted(type_counts.items(), key=lambda x: x[1], reverse=True)),
'top_senders': top_senders,
'hourly': hourly,
}