Add --media flag to resolve media file paths for images/files/videos (v0.2.4)

This commit is contained in:
canghe 2026-04-04 19:53:06 +08:00
parent 86590e7ec8
commit f6410d3ad2
6 changed files with 143 additions and 17 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@canghe_ai/wechat-cli-darwin-arm64",
"version": "0.2.3",
"version": "0.2.4",
"description": "wechat-cli binary for macOS arm64",
"os": ["darwin"],
"cpu": ["arm64"],

View File

@ -1,6 +1,6 @@
{
"name": "@canghe_ai/wechat-cli",
"version": "0.2.3",
"version": "0.2.4",
"description": "WeChat data query CLI — chat history, contacts, sessions, favorites, and more. Designed for LLM integration.",
"bin": {
"wechat-cli": "bin/wechat-cli.js"
@ -13,7 +13,7 @@
"install.js"
],
"optionalDependencies": {
"@canghe_ai/wechat-cli-darwin-arm64": "0.2.3"
"@canghe_ai/wechat-cli-darwin-arm64": "0.2.4"
},
"engines": {
"node": ">=14"

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "wechat-cli"
version = "0.2.3"
version = "0.2.4"
description = "WeChat data query CLI for LLMs"
requires-python = ">=3.10"
dependencies = [

View File

@ -22,8 +22,9 @@ from ..output.formatter import output
@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤")
@click.option("--media", is_flag=True, help="解析媒体文件路径(图片/文件/视频/语音)")
@click.pass_context
def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type):
def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type, media):
"""获取指定聊天的消息记录
\b
@ -55,7 +56,7 @@ def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type):
lines, failures = collect_chat_history(
chat_ctx, names, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, limit=limit, offset=offset,
msg_type_filter=type_filter,
msg_type_filter=type_filter, resolve_media=media, db_dir=app.db_dir,
)
if fmt == 'json':

View File

@ -149,7 +149,7 @@ def _parse_int(value, fallback=0):
return fallback
def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names, _display_name_fn):
def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names, _display_name_fn, resolve_media=False, db_dir=None, create_time_ts=0):
if not content or '<appmsg' not in content:
return None
_, sub_type = _split_msg_type(local_type)
@ -177,6 +177,22 @@ def _format_app_message_text(content, local_type, is_group, chat_username, chat_
quote_text += f"\n{prefix}{ref_content}"
return quote_text
if app_type == 6:
# Try to resolve file path
if resolve_media and db_dir:
msg_dir = os.path.join(os.path.dirname(db_dir), "msg", "file")
if title and os.path.isdir(msg_dir):
from datetime import datetime as _dt
dt = _dt.fromtimestamp(create_time_ts) if create_time_ts else None
if dt:
file_dir = os.path.join(msg_dir, dt.strftime("%Y-%m"))
if os.path.isdir(file_dir):
target = os.path.join(file_dir, title)
if os.path.isfile(target):
return f"[文件] {title}\n {target}"
# Fuzzy match
for f in os.listdir(file_dir):
if title in f or f in title:
return f"[文件] {title}\n {os.path.join(file_dir, f)}"
return f"[文件] {title}" if title else "[文件]"
if app_type == 5:
return f"[链接] {title}" if title else "[链接]"
@ -206,18 +222,125 @@ def _format_voip_message_text(content):
return f"[通话] {status_map.get(raw_text, raw_text)}"
def _format_message_text(local_id, local_type, content, is_group, chat_username, chat_display_name, names, display_name_fn):
def _resolve_media_path(db_dir, content, local_type, create_time_ts, chat_username=None):
"""尝试解析媒体文件在磁盘上的路径。
Args:
db_dir: 微信 db_storage 目录
content: 解压后的 message_content
local_type: 消息类型
create_time_ts: 消息时间戳
chat_username: 聊天对象 username用于定位 attach 子目录
Returns:
(path, exists) 元组path None 表示无法解析
"""
base_type = local_type & 0xFFFFFFFF
wechat_base = os.path.dirname(db_dir)
msg_dir = os.path.join(wechat_base, "msg")
if not os.path.isdir(msg_dir):
return None, False
from datetime import datetime
dt = datetime.fromtimestamp(create_time_ts)
date_prefix = dt.strftime("%Y-%m")
# 文件消息 (type 49, sub 6): msg/file/YYYY-MM/filename
if base_type == 49 and content:
root = _parse_xml_root(content)
if root is not None:
appmsg = root.find('.//appmsg')
if appmsg is not None:
app_type = _parse_int((appmsg.findtext('type') or '').strip())
if app_type == 6:
title = (appmsg.findtext('title') or '').strip()
if title:
file_dir = os.path.join(msg_dir, "file", date_prefix)
if os.path.isdir(file_dir):
# 精确匹配文件名
target = os.path.join(file_dir, title)
if os.path.isfile(target):
return target, True
# 模糊匹配(文件名可能有细微差异)
for f in os.listdir(file_dir):
if title in f or f in title:
return os.path.join(file_dir, f), True
return None, False
# 图片消息 (type 3): msg/attach/<hash>/YYYY-MM/Img/*.dat
# 视频/语音消息: msg/video/YYYY-MM/ 或 msg/attach/
if base_type in (3, 34, 43):
# 搜索 attach 目录下对应月份的文件
attach_dir = os.path.join(msg_dir, "attach")
if not os.path.isdir(attach_dir):
return None, False
# 尝试用 chat_username 的 MD5 匹配 attach 子目录
target_hash = None
if chat_username:
h = hashlib.md5(chat_username.encode()).hexdigest()
candidate = os.path.join(attach_dir, h)
if os.path.isdir(candidate):
target_hash = h
# 限定搜索范围:目标目录或所有目录
search_dirs = [target_hash] if target_hash else [
d for d in os.listdir(attach_dir)
if os.path.isdir(os.path.join(attach_dir, d))
]
sub_dir_name = "Img" if base_type == 3 else ("Video" if base_type == 43 else "Voice")
for d in search_dirs:
sub = os.path.join(attach_dir, d, date_prefix, sub_dir_name)
if os.path.isdir(sub):
files = [f for f in os.listdir(sub) if not f.endswith("_h.dat")]
if files:
# 返回目录路径(具体是哪个文件无法从 XML 精确匹配)
sample = files[0]
return os.path.join(sub, sample), True
# 视频:也检查 msg/video/
if base_type == 43:
video_dir = os.path.join(msg_dir, "video", date_prefix)
if os.path.isdir(video_dir):
thumbs = [f for f in os.listdir(video_dir) if f.endswith("_thumb.jpg")]
if thumbs:
return os.path.join(video_dir, thumbs[0]), True
return None, False
def _format_message_text(local_id, local_type, content, is_group, chat_username, chat_display_name, names, display_name_fn, db_dir=None, create_time_ts=0, resolve_media=False):
sender, text = _parse_message_content(content, local_type, is_group)
base_type, _ = _split_msg_type(local_type)
media_path = None
media_exists = False
if resolve_media and db_dir and content:
try:
media_path, media_exists = _resolve_media_path(
db_dir, content, local_type, create_time_ts, chat_username
)
except Exception:
pass
if base_type == 3:
text = f"[图片] (local_id={local_id})"
if media_path:
tag = f"[图片] {media_path}"
if not media_exists:
tag += " (文件不存在)"
else:
tag = f"[图片] (local_id={local_id})"
text = tag
elif base_type == 47:
text = "[表情]"
elif base_type == 50:
text = _format_voip_message_text(text) or "[通话]"
elif base_type == 49:
text = _format_app_message_text(
text, local_type, is_group, chat_username, chat_display_name, names, display_name_fn
text, local_type, is_group, chat_username, chat_display_name, names, display_name_fn,
resolve_media=resolve_media, db_dir=db_dir, create_time_ts=create_time_ts
) or "[链接/文件]"
elif base_type != 1:
type_label = format_msg_type(local_type)
@ -387,14 +510,15 @@ def _page_ranked_entries(entries, limit, offset):
# ---- 构建行 ----
def _build_history_line(row, ctx, names, id_to_username, display_name_fn):
def _build_history_line(row, ctx, names, id_to_username, display_name_fn, resolve_media=False, db_dir=None):
local_id, local_type, create_time, real_sender_id, content, ct = row
time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M')
content = decompress_content(content, ct)
if content is None:
content = '(无法解压)'
sender, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn,
db_dir=db_dir, create_time_ts=create_time, resolve_media=resolve_media,
)
sender_label = _resolve_sender_label(
real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn
@ -404,13 +528,14 @@ def _build_history_line(row, ctx, names, id_to_username, display_name_fn):
return create_time, f'[{time_str}] {text}'
def _build_search_entry(row, ctx, names, id_to_username, display_name_fn):
def _build_search_entry(row, ctx, names, id_to_username, display_name_fn, resolve_media=False, db_dir=None):
local_id, local_type, create_time, real_sender_id, content, ct = row
content = decompress_content(content, ct)
if content is None:
return None
sender, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn,
db_dir=db_dir, create_time_ts=create_time, resolve_media=resolve_media,
)
if text and len(text) > 300:
text = text[:300] + '...'
@ -427,7 +552,7 @@ def _build_search_entry(row, ctx, names, id_to_username, display_name_fn):
# ---- 聊天记录查询 ----
def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None):
def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None, resolve_media=False, db_dir=None):
collected = []
failures = []
candidate_limit = _candidate_page_size(limit, offset)
@ -446,7 +571,7 @@ def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None
fetch_offset += len(rows)
for row in rows:
try:
collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn))
collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn, resolve_media=resolve_media, db_dir=db_dir))
except Exception as e:
failures.append(f"local_id={row[0]}: {e}")
if len(collected) - before >= candidate_limit:

View File

@ -6,7 +6,7 @@ import click
from .core.context import AppContext
_VERSION = "0.2.3"
_VERSION = "0.2.4"
@click.group()