Files
wechat-cli/wechat_cli/commands/new_messages.py
canghe e64006bafe Initial release: wechat-cli v0.2.0
A CLI tool to query local WeChat data with 11 commands:
sessions, history, search, contacts, members, stats, export,
favorites, unread, new-messages, and init.

Features:
- Self-contained init with key extraction (no external deps)
- On-the-fly SQLCipher decryption with caching
- JSON output by default for LLM/AI tool integration
- Message type filtering and chat statistics
- Markdown/txt export for conversations
- Cross-platform: macOS, Windows, Linux
2026-04-04 11:10:10 +08:00

164 lines
6.0 KiB
Python

"""get-new-messages 命令 — 增量消息查询,状态持久化到磁盘"""
import json
import os
import sqlite3
from contextlib import closing
from datetime import datetime
import click
from ..core.config import STATE_DIR
from ..core.contacts import get_contact_names
from ..core.messages import decompress_content, format_msg_type
from ..output.formatter import output
STATE_FILE = os.path.join(STATE_DIR, "last_check.json")
def _load_last_state():
if not os.path.exists(STATE_FILE):
return {}
try:
with open(STATE_FILE, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def _save_last_state(state):
os.makedirs(STATE_DIR, exist_ok=True)
with open(STATE_FILE, 'w', encoding="utf-8") as f:
json.dump(state, f)
@click.command("new-messages")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def new_messages(ctx, fmt):
"""获取自上次调用以来的新消息
\b
示例:
wechat-cli new-messages # 首次: 返回未读消息并记录状态
wechat-cli new-messages # 再次: 仅返回新增消息
wechat-cli new-messages --format text # 纯文本输出
\b
状态文件: ~/.wechat-cli/last_check.json (删除此文件可重置)
"""
app = ctx.obj
path = app.cache.get(os.path.join("session", "session.db"))
if not path:
click.echo("错误: 无法解密 session.db", err=True)
ctx.exit(3)
names = get_contact_names(app.cache, app.decrypted_dir)
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute("""
SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable
WHERE last_timestamp > 0
ORDER BY last_timestamp DESC
""").fetchall()
curr_state = {}
for r in rows:
username, unread, summary, ts, msg_type, sender, sender_name = r
curr_state[username] = {
'unread': unread, 'summary': summary, 'timestamp': ts,
'msg_type': msg_type, 'sender': sender or '', 'sender_name': sender_name or '',
}
last_state = _load_last_state()
if not last_state:
# 首次调用:保存状态,返回未读
_save_last_state({u: s['timestamp'] for u, s in curr_state.items()})
unread_msgs = []
for username, s in curr_state.items():
if s['unread'] and s['unread'] > 0:
display = names.get(username, username)
is_group = '@chatroom' in username
summary = s['summary']
if isinstance(summary, bytes):
summary = decompress_content(summary, 4) or '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
time_str = datetime.fromtimestamp(s['timestamp']).strftime('%H:%M')
unread_msgs.append({
'chat': display,
'username': username,
'is_group': is_group,
'unread': s['unread'],
'last_message': str(summary or ''),
'msg_type': format_msg_type(s['msg_type']),
'time': time_str,
'timestamp': s['timestamp'],
})
if fmt == 'json':
output({'first_call': True, 'unread_count': len(unread_msgs), 'messages': unread_msgs}, 'json')
else:
if unread_msgs:
lines = []
for m in unread_msgs:
tag = " [群]" if m['is_group'] else ""
lines.append(f"[{m['time']}] {m['chat']}{tag} ({m['unread']}条未读): {m['last_message']}")
output(f"当前 {len(unread_msgs)} 个未读会话:\n\n" + "\n".join(lines), 'text')
else:
output("当前无未读消息(已记录状态,下次调用将返回新消息)", 'text')
return
# 后续调用:对比差异
new_msgs = []
for username, s in curr_state.items():
prev_ts = last_state.get(username, 0)
if s['timestamp'] > prev_ts:
display = names.get(username, username)
is_group = '@chatroom' in username
summary = s['summary']
if isinstance(summary, bytes):
summary = decompress_content(summary, 4) or '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
sender_display = ''
if is_group and s['sender']:
sender_display = names.get(s['sender'], s['sender_name'] or s['sender'])
new_msgs.append({
'chat': display,
'username': username,
'is_group': is_group,
'last_message': str(summary or ''),
'msg_type': format_msg_type(s['msg_type']),
'sender': sender_display,
'time': datetime.fromtimestamp(s['timestamp']).strftime('%H:%M:%S'),
'timestamp': s['timestamp'],
})
_save_last_state({u: s['timestamp'] for u, s in curr_state.items()})
new_msgs.sort(key=lambda m: m['timestamp'])
if fmt == 'json':
output({'first_call': False, 'new_count': len(new_msgs), 'messages': new_msgs}, 'json')
else:
if not new_msgs:
output("无新消息", 'text')
else:
lines = []
for m in new_msgs:
entry = f"[{m['time']}] {m['chat']}"
if m['is_group']:
entry += " [群]"
entry += f": {m['msg_type']}"
if m['sender']:
entry += f" ({m['sender']})"
entry += f" - {m['last_message']}"
lines.append(entry)
output(f"{len(new_msgs)} 条新消息:\n\n" + "\n".join(lines), 'text')