commit e64006bafe3f858a5593ec58783cf478f9df3481 Author: canghe Date: Sat Apr 4 11:10:10 2026 +0800 Initial release: wechat-cli v0.2.0 A CLI tool to query local WeChat data with 11 commands: sessions, history, search, contacts, members, stats, export, favorites, unread, new-messages, and init. Features: - Self-contained init with key extraction (no external deps) - On-the-fly SQLCipher decryption with caching - JSON output by default for LLM/AI tool integration - Message type filtering and chat statistics - Markdown/txt export for conversations - Cross-platform: macOS, Windows, Linux diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c620895 --- /dev/null +++ b/.gitignore @@ -0,0 +1,37 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +*.egg +dist/ +build/ + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# OS +.DS_Store +Thumbs.db + +# Project-specific +.claude/ +*.db +*.db-wal +*.db-shm + +# Sensitive data — NEVER commit +*.json +!pyproject.toml + +# Temp files +*.tmp +*.bak diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..d313569 --- /dev/null +++ b/README.md @@ -0,0 +1,234 @@ +# WeChat CLI + +[中文文档](README_CN.md) + +A command-line tool to query your local WeChat data — chat history, contacts, sessions, favorites, and more. Designed for LLM integration with JSON output by default. + +## Features + +- **Self-contained** — `pip install` + `wechat-cli init`, no external dependencies +- **11 commands** — sessions, history, search, contacts, members, stats, export, favorites, unread, new-messages, init +- **JSON by default** — structured output for programmatic access +- **Cross-platform** — macOS, Windows, Linux +- **On-the-fly decryption** — SQLCipher databases decrypted transparently with caching +- **Message type filtering** — filter by text, image, link, file, video, etc. +- **Chat statistics** — top senders, type breakdown, hourly activity distribution +- **Markdown export** — export conversations as markdown or plain text + +## Quick Start + +### Install + +```bash +pip install wechat-cli +``` + +Or install from source: + +```bash +git clone https://github.com/canghe/wechat-cli.git +cd wechat-cli +pip install -e . +``` + +### Initialize + +Make sure WeChat is running, then: + +```bash +# macOS/Linux: may need sudo for memory scanning +sudo wechat-cli init + +# Windows: run in a terminal with sufficient privileges +wechat-cli init +``` + +This will: +1. Auto-detect your WeChat data directory +2. Extract encryption keys from WeChat process memory +3. Save config and keys to `~/.wechat-cli/` + +That's it — you're ready to go. + +## Commands + +### sessions — Recent Chats + +```bash +wechat-cli sessions # Last 20 sessions (JSON) +wechat-cli sessions --limit 10 # Last 10 +wechat-cli sessions --format text # Human-readable +``` + +### history — Chat Messages + +```bash +wechat-cli history "Alice" # Last 50 messages +wechat-cli history "Alice" --limit 100 --offset 50 +wechat-cli history "Team" --start-time "2026-04-01" --end-time "2026-04-03" +wechat-cli history "Alice" --type link # Only links/files +wechat-cli history "Alice" --format text +``` + +**Options:** `--limit`, `--offset`, `--start-time`, `--end-time`, `--type`, `--format` + +### search — Search Messages + +```bash +wechat-cli search "hello" # Global search +wechat-cli search "hello" --chat "Alice" # In specific chat +wechat-cli search "meeting" --chat "TeamA" --chat "TeamB" # Multiple chats +wechat-cli search "report" --type file # Only files +wechat-cli search "hello" --start-time "2026-04-01" --limit 50 +``` + +**Options:** `--chat` (repeatable), `--start-time`, `--end-time`, `--limit`, `--offset`, `--type`, `--format` + +### contacts — Contact Search & Details + +```bash +wechat-cli contacts --query "Li" # Search contacts +wechat-cli contacts --detail "Alice" # View contact details +wechat-cli contacts --detail "wxid_xxx" # By WeChat ID +``` + +Details include: nickname, remark, WeChat ID (alias), bio, avatar URL, account type. + +### members — Group Members + +```bash +wechat-cli members "Team Group" # List all members (JSON) +wechat-cli members "Team Group" --format text +``` + +Shows member list with display names and group owner. + +### stats — Chat Statistics + +```bash +wechat-cli stats "Team Group" +wechat-cli stats "Alice" --start-time "2026-04-01" --end-time "2026-04-03" +wechat-cli stats "Team Group" --format text +``` + +Returns: total messages, type breakdown, top 10 senders, 24-hour activity distribution. + +### export — Export Conversations + +```bash +wechat-cli export "Alice" --format markdown # To stdout +wechat-cli export "Alice" --format txt --output chat.txt # To file +wechat-cli export "Team" --start-time "2026-04-01" --limit 1000 +``` + +**Options:** `--format markdown|txt`, `--output`, `--start-time`, `--end-time`, `--limit` + +### favorites — WeChat Bookmarks + +```bash +wechat-cli favorites # Recent bookmarks +wechat-cli favorites --type article # Articles only +wechat-cli favorites --query "machine learning" # Search +``` + +**Types:** text, image, article, card, video + +### unread — Unread Sessions + +```bash +wechat-cli unread # All unread sessions +wechat-cli unread --limit 10 --format text +``` + +### new-messages — Incremental New Messages + +```bash +wechat-cli new-messages # First call: return unread + save state +wechat-cli new-messages # Subsequent: only new since last call +``` + +State persists at `~/.wechat-cli/last_check.json`. Delete it to reset. + +## Message Type Filter + +The `--type` option (available on `history` and `search`) accepts: + +| Value | Description | +|-------|-------------| +| `text` | Text messages | +| `image` | Images | +| `voice` | Voice messages | +| `video` | Videos | +| `sticker` | Stickers/emojis | +| `location` | Location shares | +| `link` | Links and app messages | +| `file` | File attachments | +| `call` | Voice/video calls | +| `system` | System messages | + +## Use Cases + +### LLM / AI Tool Integration + +```bash +# For Claude Code, Cursor, or any AI tool that can run shell commands +wechat-cli sessions --limit 5 +wechat-cli history "Alice" --limit 20 --format text +wechat-cli search "deadline" --chat "Team" --type text +``` + +All commands output JSON by default, making them ideal for AI agent tool calls. + +### Chat Analysis + +```bash +# Who talks the most in a group? +wechat-cli stats "Team Group" --format text + +# Find all shared links in a conversation +wechat-cli history "Alice" --type link --limit 50 + +# Search for a specific file +wechat-cli search "report.xlsx" --type file +``` + +### Data Backup + +```bash +# Export important conversations +wechat-cli export "Team Group" --format markdown --output team_chat.md +wechat-cli export "Alice" --start-time "2026-01-01" --format txt --output alice_2026.txt +``` + +### Notification Monitoring + +```bash +# Cron job to check for new messages every 5 minutes +*/5 * * * * wechat-cli new-messages --format text +``` + +## Platform Support + +| Platform | Status | Notes | +|----------|--------|-------| +| macOS (Apple Silicon) | Supported | Bundled arm64 binary for key extraction | +| macOS (Intel) | Supported | x86_64 binary needed | +| Windows | Supported | Reads Weixin.exe process memory | +| Linux | Supported | Reads /proc/pid/mem, requires root | + +## How It Works + +WeChat stores chat data in SQLCipher-encrypted SQLite databases on your local machine. WeChat CLI: + +1. **Extracts keys** — scans WeChat process memory to find encryption keys (`wechat-cli init`) +2. **Decrypts on-the-fly** — transparently decrypts databases with page-level AES-256-CBC + caching +3. **Queries locally** — all data stays on your machine, no network access required + +## Requirements + +- Python >= 3.10 +- WeChat running locally (for `init` key extraction) + +## License + +[Apache License 2.0](LICENSE) diff --git a/README_CN.md b/README_CN.md new file mode 100644 index 0000000..591a23b --- /dev/null +++ b/README_CN.md @@ -0,0 +1,232 @@ +# WeChat CLI + +[English](README.md) + +命令行工具,查询本地微信数据——聊天记录、联系人、会话、收藏等。默认 JSON 输出,专为 LLM 集成设计。 + +## 功能亮点 + +- **开箱即用** — `pip install` + `wechat-cli init`,无外部依赖 +- **11 个命令** — sessions、history、search、contacts、members、stats、export、favorites、unread、new-messages、init +- **默认 JSON** — 结构化输出,方便程序解析 +- **跨平台** — macOS、Windows、Linux +- **即时解密** — SQLCipher 数据库透明解密,带缓存 +- **消息类型过滤** — 按文本、图片、链接、文件、视频等过滤 +- **聊天统计** — 发言排行、类型分布、24 小时活跃分布 +- **Markdown 导出** — 将聊天记录导出为 markdown 或纯文本 + +## 快速开始 + +### 安装 + +```bash +pip install wechat-cli +``` + +或从源码安装: + +```bash +git clone https://github.com/canghe/wechat-cli.git +cd wechat-cli +pip install -e . +``` + +### 初始化 + +确保微信正在运行,然后: + +```bash +# macOS/Linux: 可能需要 sudo 权限 +sudo wechat-cli init + +# Windows: 在有足够权限的终端中运行 +wechat-cli init +``` + +这一步会: +1. 自动检测微信数据目录 +2. 从微信进程内存中提取加密密钥 +3. 将配置和密钥保存到 `~/.wechat-cli/` + +完成后即可使用所有命令。 + +## 命令一览 + +### sessions — 最近会话 + +```bash +wechat-cli sessions # 最近 20 个会话 (JSON) +wechat-cli sessions --limit 10 # 最近 10 个 +wechat-cli sessions --format text # 纯文本输出 +``` + +### history — 聊天记录 + +```bash +wechat-cli history "张三" # 最近 50 条消息 +wechat-cli history "张三" --limit 100 --offset 50 +wechat-cli history "交流群" --start-time "2026-04-01" --end-time "2026-04-03" +wechat-cli history "张三" --type link # 只看链接/文件 +wechat-cli history "张三" --format text +``` + +**选项:** `--limit`、`--offset`、`--start-time`、`--end-time`、`--type`、`--format` + +### search — 搜索消息 + +```bash +wechat-cli search "Claude" # 全局搜索 +wechat-cli search "Claude" --chat "交流群" # 指定聊天搜索 +wechat-cli search "开会" --chat "群A" --chat "群B" # 多个聊天 +wechat-cli search "报告" --type file # 只搜文件 +``` + +**选项:** `--chat`(可多次指定)、`--start-time`、`--end-time`、`--limit`、`--offset`、`--type`、`--format` + +### contacts — 联系人搜索与详情 + +```bash +wechat-cli contacts --query "李" # 搜索联系人 +wechat-cli contacts --detail "张三" # 查看详情 +wechat-cli contacts --detail "wxid_xxx" # 通过 wxid 查看 +``` + +详情包括:昵称、备注、微信号、个性签名、头像 URL、账号类型。 + +### members — 群成员列表 + +```bash +wechat-cli members "AI交流群" # 成员列表 (JSON) +wechat-cli members "AI交流群" --format text +``` + +显示成员昵称、wxid、备注和群主。 + +### stats — 聊天统计 + +```bash +wechat-cli stats "AI交流群" +wechat-cli stats "张三" --start-time "2026-04-01" --end-time "2026-04-03" +wechat-cli stats "AI交流群" --format text +``` + +返回:消息总数、类型分布、发言 Top 10、24 小时活跃分布(含柱状图)。 + +### export — 导出聊天记录 + +```bash +wechat-cli export "张三" --format markdown # 输出到 stdout +wechat-cli export "张三" --format txt --output chat.txt # 输出到文件 +wechat-cli export "群聊" --start-time "2026-04-01" --limit 1000 +``` + +**选项:** `--format markdown|txt`、`--output`、`--start-time`、`--end-time`、`--limit` + +### favorites — 微信收藏 + +```bash +wechat-cli favorites # 最近收藏 +wechat-cli favorites --type article # 只看文章 +wechat-cli favorites --query "计算机网络" # 搜索收藏 +``` + +**类型:** text、image、article、card、video + +### unread — 未读会话 + +```bash +wechat-cli unread # 所有未读会话 +wechat-cli unread --limit 10 --format text +``` + +### new-messages — 增量新消息 + +```bash +wechat-cli new-messages # 首次: 返回未读消息 + 保存状态 +wechat-cli new-messages # 后续: 仅返回上次以来的新消息 +``` + +状态保存在 `~/.wechat-cli/last_check.json`,删除此文件可重置。 + +## 消息类型过滤 + +`--type` 选项(适用于 `history` 和 `search`): + +| 值 | 说明 | +|---|------| +| `text` | 文本消息 | +| `image` | 图片 | +| `voice` | 语音 | +| `video` | 视频 | +| `sticker` | 表情 | +| `location` | 位置 | +| `link` | 链接/应用消息 | +| `file` | 文件 | +| `call` | 音视频通话 | +| `system` | 系统消息 | + +## 使用场景 + +### AI 工具集成 + +```bash +# 供 Claude Code、Cursor 等 AI 工具调用 +wechat-cli sessions --limit 5 +wechat-cli history "张三" --limit 20 --format text +wechat-cli search "截止日期" --chat "项目组" --type text +``` + +所有命令默认输出 JSON,适合 AI Agent 工具调用。 + +### 聊天分析 + +```bash +# 群里谁最活跃? +wechat-cli stats "项目组" --format text + +# 查看所有分享的链接 +wechat-cli history "张三" --type link --limit 50 + +# 搜索特定文件 +wechat-cli search "报告.xlsx" --type file +``` + +### 数据备份 + +```bash +wechat-cli export "项目组" --format markdown --output project.md +wechat-cli export "张三" --start-time "2026-01-01" --format txt --output chat.txt +``` + +### 消息监控 + +```bash +# 定时检查新消息 +*/5 * * * * wechat-cli new-messages --format text +``` + +## 平台支持 + +| 平台 | 状态 | 说明 | +|------|------|------| +| macOS (Apple Silicon) | 支持 | 内置 arm64 二进制用于密钥提取 | +| macOS (Intel) | 支持 | 需要 x86_64 二进制 | +| Windows | 支持 | 读取 Weixin.exe 进程内存 | +| Linux | 支持 | 读取 /proc/pid/mem,需要 root | + +## 工作原理 + +微信将聊天数据存储在本地的 SQLCipher 加密 SQLite 数据库中。WeChat CLI: + +1. **提取密钥** — 扫描微信进程内存获取加密密钥(`wechat-cli init`) +2. **即时解密** — 透明解密数据库,使用页级 AES-256-CBC + 缓存 +3. **本地查询** — 所有数据留在本机,无需网络访问 + +## 环境要求 + +- Python >= 3.10 +- 微信在本地运行(用于 `init` 密钥提取) + +## 开源协议 + +[Apache License 2.0](LICENSE) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7152afd --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +[build-system] +requires = ["setuptools>=68.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "wechat-cli" +version = "0.2.0" +description = "WeChat data query CLI for LLMs" +requires-python = ">=3.10" +dependencies = [ + "click>=8.1,<9", + "pycryptodome>=3.19,<4", + "zstandard>=0.22,<1", +] + +[project.scripts] +wechat-cli = "wechat_cli.main:cli" + +[tool.setuptools.packages.find] +where = ["."] + +[tool.setuptools.package-data] +wechat_cli = ["bin/*"] diff --git a/wechat_cli/__init__.py b/wechat_cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_cli/bin/find_all_keys_macos.arm64 b/wechat_cli/bin/find_all_keys_macos.arm64 new file mode 100755 index 0000000..8fc5940 Binary files /dev/null and b/wechat_cli/bin/find_all_keys_macos.arm64 differ diff --git a/wechat_cli/bin/find_all_keys_macos.c b/wechat_cli/bin/find_all_keys_macos.c new file mode 100644 index 0000000..eb6a9e5 --- /dev/null +++ b/wechat_cli/bin/find_all_keys_macos.c @@ -0,0 +1,319 @@ +/* + * find_all_keys_macos.c - macOS WeChat memory key scanner + * + * Scans WeChat process memory for SQLCipher encryption keys in the + * x'' format used by WeChat 4.x on macOS. + * + * Prerequisites: + * - WeChat must be ad-hoc signed (or SIP disabled) + * - Must run as root (sudo) + * + * Build: + * cc -O2 -o find_all_keys_macos find_all_keys_macos.c -framework Foundation + * + * Usage: + * sudo ./find_all_keys_macos [pid] + * If pid is omitted, automatically finds WeChat PID. + * + * Output: JSON file at ./all_keys.json (compatible with decrypt_db.py) + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define MAX_KEYS 256 +#define KEY_SIZE 32 +#define SALT_SIZE 16 +#define HEX_PATTERN_LEN 96 /* 64 hex (key) + 32 hex (salt) */ +#define CHUNK_SIZE (2 * 1024 * 1024) + +typedef struct { + char key_hex[65]; + char salt_hex[33]; + char full_pragma[100]; +} key_entry_t; + +/* Forward declaration */ +static int read_db_salt(const char *path, char *salt_hex_out); + +/* nftw callback state for collecting DB files */ +#define MAX_DBS 256 +static char g_db_salts[MAX_DBS][33]; +static char g_db_names[MAX_DBS][256]; +static int g_db_count = 0; +static int nftw_collect_db(const char *fpath, const struct stat *sb, + int typeflag, struct FTW *ftwbuf) { + (void)sb; (void)ftwbuf; + if (typeflag != FTW_F) return 0; + size_t len = strlen(fpath); + if (len < 3 || strcmp(fpath + len - 3, ".db") != 0) return 0; + if (g_db_count >= MAX_DBS) return 0; + + char salt[33]; + if (read_db_salt(fpath, salt) != 0) return 0; + + strcpy(g_db_salts[g_db_count], salt); + /* Extract relative path from db_storage/ */ + const char *rel = strstr(fpath, "db_storage/"); + if (rel) rel += strlen("db_storage/"); + else { + rel = strrchr(fpath, '/'); + rel = rel ? rel + 1 : fpath; + } + strncpy(g_db_names[g_db_count], rel, 255); + g_db_names[g_db_count][255] = '\0'; + printf(" %s: salt=%s\n", g_db_names[g_db_count], salt); + g_db_count++; + return 0; +} + +static int is_hex_char(unsigned char c) { + return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'); +} + +static pid_t find_wechat_pid(void) { + FILE *fp = popen("pgrep -x WeChat", "r"); + if (!fp) return -1; + char buf[64]; + pid_t pid = -1; + if (fgets(buf, sizeof(buf), fp)) + pid = atoi(buf); + pclose(fp); + return pid; +} + +/* Read DB salt (first 16 bytes) and return hex string */ +static int read_db_salt(const char *path, char *salt_hex_out) { + FILE *f = fopen(path, "rb"); + if (!f) return -1; + unsigned char header[16]; + if (fread(header, 1, 16, f) != 16) { fclose(f); return -1; } + fclose(f); + /* Check if unencrypted */ + if (memcmp(header, "SQLite format 3", 15) == 0) return -1; + for (int i = 0; i < 16; i++) + sprintf(salt_hex_out + i * 2, "%02x", header[i]); + salt_hex_out[32] = '\0'; + return 0; +} + +int main(int argc, char *argv[]) { + pid_t pid; + if (argc >= 2) + pid = atoi(argv[1]); + else + pid = find_wechat_pid(); + + if (pid <= 0) { + fprintf(stderr, "WeChat not running or invalid PID\n"); + return 1; + } + + printf("============================================================\n"); + printf(" macOS WeChat Memory Key Scanner (C version)\n"); + printf("============================================================\n"); + printf("WeChat PID: %d\n", pid); + + /* Get task port */ + mach_port_t task; + kern_return_t kr = task_for_pid(mach_task_self(), pid, &task); + if (kr != KERN_SUCCESS) { + fprintf(stderr, "task_for_pid failed: %d\n", kr); + fprintf(stderr, "Make sure: (1) running as root, (2) WeChat is ad-hoc signed\n"); + return 1; + } + printf("Got task port: %u\n", task); + + /* Resolve real user's HOME (sudo may change HOME to /var/root) */ + const char *home = getenv("HOME"); + const char *sudo_user = getenv("SUDO_USER"); + if (sudo_user) { + struct passwd *pw = getpwnam(sudo_user); + if (pw && pw->pw_dir) + home = pw->pw_dir; + } + if (!home) home = "/root"; + printf("User home: %s\n", home); + + /* Collect DB salts by recursively walking db_storage directories. + * Note: POSIX glob() does not support ** recursive matching on macOS, + * so we use nftw() to walk the directory tree instead. */ + printf("\nScanning for DB files...\n"); + char db_base_dir[512]; + snprintf(db_base_dir, sizeof(db_base_dir), + "%s/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files", + home); + + /* Walk each account's db_storage directory */ + DIR *xdir = opendir(db_base_dir); + if (xdir) { + struct dirent *ent; + while ((ent = readdir(xdir)) != NULL) { + if (ent->d_name[0] == '.') continue; + char storage_path[768]; + snprintf(storage_path, sizeof(storage_path), + "%s/%s/db_storage", db_base_dir, ent->d_name); + struct stat st; + if (stat(storage_path, &st) == 0 && S_ISDIR(st.st_mode)) { + nftw(storage_path, nftw_collect_db, 20, FTW_PHYS); + } + } + closedir(xdir); + } + printf("Found %d encrypted DBs\n", g_db_count); + + /* Scan memory for x' patterns */ + printf("\nScanning memory for keys...\n"); + key_entry_t keys[MAX_KEYS]; + int key_count = 0; + size_t total_scanned = 0; + int region_count = 0; + + mach_vm_address_t addr = 0; + while (1) { + mach_vm_size_t size = 0; + vm_region_basic_info_data_64_t info; + mach_msg_type_number_t info_count = VM_REGION_BASIC_INFO_COUNT_64; + mach_port_t obj_name; + + kr = mach_vm_region(task, &addr, &size, VM_REGION_BASIC_INFO_64, + (vm_region_info_t)&info, &info_count, &obj_name); + if (kr != KERN_SUCCESS) break; + if (size == 0) { addr++; continue; } /* guard against infinite loop */ + + if ((info.protection & (VM_PROT_READ | VM_PROT_WRITE)) == + (VM_PROT_READ | VM_PROT_WRITE)) { + region_count++; + + mach_vm_address_t ca = addr; + while (ca < addr + size) { + mach_vm_size_t cs = addr + size - ca; + if (cs > CHUNK_SIZE) cs = CHUNK_SIZE; + + vm_offset_t data; + mach_msg_type_number_t dc; + kr = mach_vm_read(task, ca, cs, &data, &dc); + if (kr == KERN_SUCCESS) { + unsigned char *buf = (unsigned char *)data; + total_scanned += dc; + + for (size_t i = 0; i + HEX_PATTERN_LEN + 3 < dc; i++) { + if (buf[i] == 'x' && buf[i + 1] == '\'') { + /* Check if followed by 96 hex chars and closing ' */ + int valid = 1; + for (int j = 0; j < HEX_PATTERN_LEN; j++) { + if (!is_hex_char(buf[i + 2 + j])) { valid = 0; break; } + } + if (!valid) continue; + if (buf[i + 2 + HEX_PATTERN_LEN] != '\'') continue; + + /* Extract key and salt hex */ + char key_hex[65], salt_hex[33]; + memcpy(key_hex, buf + i + 2, 64); + key_hex[64] = '\0'; + memcpy(salt_hex, buf + i + 2 + 64, 32); + salt_hex[32] = '\0'; + + /* Convert to lowercase for comparison */ + for (int j = 0; key_hex[j]; j++) + if (key_hex[j] >= 'A' && key_hex[j] <= 'F') + key_hex[j] += 32; + for (int j = 0; salt_hex[j]; j++) + if (salt_hex[j] >= 'A' && salt_hex[j] <= 'F') + salt_hex[j] += 32; + + /* Deduplicate */ + int dup = 0; + for (int k = 0; k < key_count; k++) { + if (strcmp(keys[k].key_hex, key_hex) == 0 && + strcmp(keys[k].salt_hex, salt_hex) == 0) { + dup = 1; break; + } + } + if (dup) continue; + + if (key_count < MAX_KEYS) { + strcpy(keys[key_count].key_hex, key_hex); + strcpy(keys[key_count].salt_hex, salt_hex); + snprintf(keys[key_count].full_pragma, sizeof(keys[key_count].full_pragma), + "x'%s%s'", key_hex, salt_hex); + key_count++; + } + } + } + mach_vm_deallocate(mach_task_self(), data, dc); + } + /* Advance with overlap to catch patterns spanning chunk boundaries. + * Pattern is x'<96 hex chars>' = 99 bytes total. */ + if (cs > HEX_PATTERN_LEN + 3) + ca += cs - (HEX_PATTERN_LEN + 3); + else + ca += cs; + } + } + addr += size; + } + + printf("\nScan complete: %zuMB scanned, %d regions, %d unique keys\n", + total_scanned / 1024 / 1024, region_count, key_count); + + /* Match keys to DBs */ + printf("\n%-25s %-66s %s\n", "Database", "Key", "Salt"); + printf("%-25s %-66s %s\n", + "-------------------------", + "------------------------------------------------------------------", + "--------------------------------"); + + int matched = 0; + for (int i = 0; i < key_count; i++) { + const char *db = NULL; + for (int j = 0; j < g_db_count; j++) { + if (strcmp(keys[i].salt_hex, g_db_salts[j]) == 0) { + db = g_db_names[j]; + matched++; + break; + } + } + printf("%-25s %-66s %s\n", + db ? db : "(unknown)", + keys[i].key_hex, + keys[i].salt_hex); + } + printf("\nMatched %d/%d keys to known DBs\n", matched, key_count); + + /* Save JSON: { "rel/path.db": { "enc_key": "hex" }, ... } + * Uses forward slashes (native macOS paths, valid JSON without escaping). + */ + const char *out_path = "all_keys.json"; + FILE *fp = fopen(out_path, "w"); + if (fp) { + fprintf(fp, "{\n"); + int first = 1; + for (int i = 0; i < key_count; i++) { + const char *db = NULL; + for (int j = 0; j < g_db_count; j++) { + if (strcmp(keys[i].salt_hex, g_db_salts[j]) == 0) { + db = g_db_names[j]; + break; + } + } + if (!db) continue; + fprintf(fp, "%s \"%s\": {\"enc_key\": \"%s\"}", + first ? "" : ",\n", db, keys[i].key_hex); + first = 0; + } + fprintf(fp, "\n}\n"); + fclose(fp); + printf("Saved to %s\n", out_path); + } + + return 0; +} diff --git a/wechat_cli/commands/__init__.py b/wechat_cli/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_cli/commands/contacts.py b/wechat_cli/commands/contacts.py new file mode 100644 index 0000000..fd2206b --- /dev/null +++ b/wechat_cli/commands/contacts.py @@ -0,0 +1,91 @@ +"""contacts 命令 — 搜索或查看联系人""" + +import click + +from ..core.contacts import get_contact_full, get_contact_names, resolve_username, get_contact_detail +from ..output.formatter import output + + +@click.command("contacts") +@click.option("--query", default="", help="搜索关键词(匹配昵称、备注、wxid)") +@click.option("--detail", default=None, help="查看联系人详情(传入昵称/备注/wxid)") +@click.option("--limit", default=50, help="返回数量") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def contacts(ctx, query, detail, limit, fmt): + """搜索或列出联系人 + + \b + 示例: + wechat-cli contacts --query "李" # 搜索联系人 + wechat-cli contacts --detail "张三" # 查看联系人详情 + wechat-cli contacts --detail "wxid_xxx" # 通过 wxid 查看 + """ + app = ctx.obj + + if detail: + _show_detail(app, detail, fmt) + return + + names = get_contact_names(app.cache, app.decrypted_dir) + full = get_contact_full(app.cache, app.decrypted_dir) + + if query: + q_lower = query.lower() + matched = [c for c in full if q_lower in c.get('nick_name', '').lower() + or q_lower in c.get('remark', '').lower() + or q_lower in c.get('username', '').lower()] + else: + matched = full + + matched = matched[:limit] + + if fmt == 'json': + output(matched, 'json') + else: + header = f"找到 {len(matched)} 个联系人:" + lines = [] + for c in matched: + display = c['remark'] or c['nick_name'] or c['username'] + line = f"{display} ({c['username']})" + if c['remark']: + line += f" 备注: {c['remark']}" + lines.append(line) + output(header + "\n\n" + "\n".join(lines), 'text') + + +def _show_detail(app, name_or_id, fmt): + """显示联系人详情。""" + names = get_contact_names(app.cache, app.decrypted_dir) + + # 尝试解析为 username + username = resolve_username(name_or_id, app.cache, app.decrypted_dir) + if not username: + # 直接用原始输入试试 + username = name_or_id + + info = get_contact_detail(username, app.cache, app.decrypted_dir) + if not info: + click.echo(f"找不到联系人: {name_or_id}", err=True) + return + + if fmt == 'json': + output(info, 'json') + else: + lines = [f"联系人详情: {info['nick_name']}"] + if info['remark']: + lines.append(f"备注: {info['remark']}") + if info['alias']: + lines.append(f"微信号: {info['alias']}") + lines.append(f"wxid: {info['username']}") + if info['description']: + lines.append(f"个性签名: {info['description']}") + if info['is_group']: + lines.append("类型: 群聊") + elif info['is_subscription']: + lines.append("类型: 公众号") + elif info['verify_flag'] and info['verify_flag'] >= 8: + lines.append("类型: 企业认证") + if info['avatar']: + lines.append(f"头像: {info['avatar']}") + output("\n".join(lines), 'text') diff --git a/wechat_cli/commands/export.py b/wechat_cli/commands/export.py new file mode 100644 index 0000000..c1be606 --- /dev/null +++ b/wechat_cli/commands/export.py @@ -0,0 +1,101 @@ +"""export 命令 — 导出聊天记录为 markdown 或 txt""" + +import click +from datetime import datetime + +from ..core.contacts import get_contact_names +from ..core.messages import ( + collect_chat_history, + parse_time_range, + resolve_chat_context, + validate_pagination, +) +from ..output.formatter import output + + +@click.command("export") +@click.argument("chat_name") +@click.option("--format", "fmt", default="markdown", type=click.Choice(["markdown", "txt"]), help="导出格式") +@click.option("--output", "output_path", default=None, help="输出文件路径(默认输出到 stdout)") +@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]") +@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]") +@click.option("--limit", default=500, help="导出消息数量") +@click.pass_context +def export(ctx, chat_name, fmt, output_path, start_time, end_time, limit): + """导出聊天记录为 markdown 或纯文本 + + \b + 示例: + wechat-cli export "张三" --format markdown + wechat-cli export "AI交流群" --format txt --output chat.txt + wechat-cli export "张三" --start-time "2026-04-01" --limit 1000 + """ + app = ctx.obj + + try: + validate_pagination(limit, 0, limit_max=None) + start_ts, end_ts = parse_time_range(start_time, end_time) + except ValueError as e: + click.echo(f"错误: {e}", err=True) + ctx.exit(2) + + chat_ctx = resolve_chat_context(chat_name, app.msg_db_keys, app.cache, app.decrypted_dir) + if not chat_ctx: + click.echo(f"找不到聊天对象: {chat_name}", err=True) + ctx.exit(1) + if not chat_ctx['db_path']: + click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True) + ctx.exit(1) + + names = get_contact_names(app.cache, app.decrypted_dir) + lines, failures = collect_chat_history( + chat_ctx, names, app.display_name_fn, + start_ts=start_ts, end_ts=end_ts, limit=limit, offset=0, + ) + + if not lines: + click.echo(f"{chat_ctx['display_name']} 无消息记录", err=True) + ctx.exit(0) + + now = datetime.now().strftime('%Y-%m-%d %H:%M') + chat_type = "群聊" if chat_ctx['is_group'] else "私聊" + time_range = f"{start_time or '最早'} ~ {end_time or '最新'}" + + if fmt == 'markdown': + content = _format_markdown(chat_ctx['display_name'], chat_type, time_range, now, lines) + else: + content = _format_txt(chat_ctx['display_name'], chat_type, time_range, now, lines) + + if output_path: + with open(output_path, 'w', encoding='utf-8') as f: + f.write(content) + if not content.endswith('\n'): + f.write('\n') + click.echo(f"已导出到: {output_path}({len(lines)} 条消息)", err=True) + else: + output(content, 'text') + + +def _format_markdown(display_name, chat_type, time_range, export_time, lines): + header = ( + f"# 聊天记录: {display_name}\n\n" + f"**时间范围:** {time_range}\n\n" + f"**导出时间:** {export_time}\n\n" + f"**消息数量:** {len(lines)}\n\n" + f"**类型:** {chat_type}\n\n---\n" + ) + body = "\n".join(f"- {line}" for line in lines) + return header + body + + +def _format_txt(display_name, chat_type, time_range, export_time, lines): + header = ( + f"聊天记录: {display_name}\n" + f"类型: {chat_type}\n" + f"时间范围: {time_range}\n" + f"导出时间: {export_time}\n" + f"消息数量: {len(lines)}\n" + f"{'=' * 60}" + ) + body = "\n".join(lines) + return header + "\n" + body diff --git a/wechat_cli/commands/favorites.py b/wechat_cli/commands/favorites.py new file mode 100644 index 0000000..2b91d19 --- /dev/null +++ b/wechat_cli/commands/favorites.py @@ -0,0 +1,142 @@ +"""favorites 命令 — 查看微信收藏""" + +import os +import sqlite3 +import xml.etree.ElementTree as ET +from contextlib import closing +from datetime import datetime + +import click + +from ..core.contacts import get_contact_names +from ..output.formatter import output + +_FAV_TYPE_MAP = { + 1: '文本', 2: '图片', 5: '文章', 19: '名片', 20: '视频号', +} + +_FAV_TYPE_FILTERS = { + 'text': 1, 'image': 2, 'article': 5, 'card': 19, 'video': 20, +} + + +def _parse_fav_content(content, fav_type): + """从 XML content 提取摘要信息。""" + if not content: + return '' + try: + root = ET.fromstring(content) + except ET.ParseError: + return '' + item = root if root.tag == 'favitem' else root.find('.//favitem') + if item is None: + return '' + + if fav_type == 1: + return (item.findtext('desc') or '').strip() + if fav_type == 2: + return '[图片收藏]' + if fav_type == 5: + title = (item.findtext('.//pagetitle') or '').strip() + desc = (item.findtext('.//pagedesc') or '').strip() + return f"{title} - {desc}" if desc else title + if fav_type == 19: + return (item.findtext('desc') or '').strip() + if fav_type == 20: + nickname = (item.findtext('.//nickname') or '').strip() + desc = (item.findtext('.//desc') or '').strip() + parts = [p for p in [nickname, desc] if p] + return ' '.join(parts) if parts else '[视频号]' + desc = (item.findtext('desc') or '').strip() + return desc if desc else '[收藏]' + + +@click.command("favorites") +@click.option("--limit", default=20, help="返回数量") +@click.option("--type", "fav_type", default=None, + type=click.Choice(list(_FAV_TYPE_FILTERS.keys())), + help="按类型过滤: text/image/article/card/video") +@click.option("--query", default=None, help="关键词搜索") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def favorites(ctx, limit, fav_type, query, fmt): + """查看微信收藏 + + \b + 示例: + wechat-cli favorites # 最近收藏 + wechat-cli favorites --type article # 只看文章 + wechat-cli favorites --query "计算机网络" # 搜索收藏 + wechat-cli favorites --limit 5 --format text + """ + app = ctx.obj + + # 查找 favorite.db + fav_path = None + pre_decrypted = os.path.join(app.decrypted_dir, "favorite", "favorite.db") + if os.path.exists(pre_decrypted): + fav_path = pre_decrypted + else: + fav_path = app.cache.get(os.path.join("favorite", "favorite.db")) + if not fav_path: + click.echo("错误: 无法访问 favorite.db", err=True) + ctx.exit(3) + + names = get_contact_names(app.cache, app.decrypted_dir) + + with closing(sqlite3.connect(fav_path)) as conn: + where_parts = [] + params = [] + + if fav_type: + where_parts.append('type = ?') + params.append(_FAV_TYPE_FILTERS[fav_type]) + + if query: + where_parts.append('content LIKE ?') + params.append(f'%{query}%') + + where_sql = f"WHERE {' AND '.join(where_parts)}" if where_parts else '' + + rows = conn.execute(f""" + SELECT local_id, type, update_time, content, fromusr, realchatname + FROM fav_db_item + {where_sql} + ORDER BY update_time DESC + LIMIT ? + """, (*params, limit)).fetchall() + + results = [] + for local_id, typ, ts, content, fromusr, realchat in rows: + from_display = names.get(fromusr, fromusr) if fromusr else '' + chat_display = names.get(realchat, realchat) if realchat else '' + + summary = _parse_fav_content(content, typ) + + results.append({ + 'id': local_id, + 'type': _FAV_TYPE_MAP.get(typ, f'type={typ}'), + 'time': datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M'), + 'summary': summary, + 'from': from_display, + 'source_chat': chat_display, + }) + + if fmt == 'json': + output({ + 'count': len(results), + 'favorites': results, + }, 'json') + else: + if not results: + output("没有找到收藏", 'text') + return + lines = [] + for r in results: + entry = f"[{r['time']}] [{r['type']}] {r['summary']}" + if r['from']: + entry += f"\n 来自: {r['from']}" + if r['source_chat']: + entry += f" 聊天: {r['source_chat']}" + lines.append(entry) + output(f"收藏列表({len(results)} 条):\n\n" + "\n\n".join(lines), 'text') diff --git a/wechat_cli/commands/history.py b/wechat_cli/commands/history.py new file mode 100644 index 0000000..4e4dce3 --- /dev/null +++ b/wechat_cli/commands/history.py @@ -0,0 +1,86 @@ +"""get-chat-history 命令""" + +import click + +from ..core.contacts import get_contact_names +from ..core.messages import ( + MSG_TYPE_FILTERS, + MSG_TYPE_NAMES, + collect_chat_history, + parse_time_range, + resolve_chat_context, + validate_pagination, +) +from ..output.formatter import output + + +@click.command("history") +@click.argument("chat_name") +@click.option("--limit", default=50, help="返回的消息数量") +@click.option("--offset", default=0, help="分页偏移量") +@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]") +@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤") +@click.pass_context +def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type): + """获取指定聊天的消息记录 + + \b + 示例: + wechat-cli history "张三" # 最近 50 条消息 + wechat-cli history "张三" --limit 100 --offset 50 # 分页查询 + wechat-cli history "AI交流群" --start-time "2026-04-01" --end-time "2026-04-02" + wechat-cli history "张三" --format text # 纯文本输出 + """ + app = ctx.obj + + try: + validate_pagination(limit, offset, limit_max=None) + start_ts, end_ts = parse_time_range(start_time, end_time) + except ValueError as e: + click.echo(f"错误: {e}", err=True) + ctx.exit(2) + + chat_ctx = resolve_chat_context(chat_name, app.msg_db_keys, app.cache, app.decrypted_dir) + if not chat_ctx: + click.echo(f"找不到聊天对象: {chat_name}", err=True) + ctx.exit(1) + if not chat_ctx['db_path']: + click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True) + ctx.exit(1) + + names = get_contact_names(app.cache, app.decrypted_dir) + type_filter = MSG_TYPE_FILTERS[msg_type] if msg_type else None + lines, failures = collect_chat_history( + chat_ctx, names, app.display_name_fn, + start_ts=start_ts, end_ts=end_ts, limit=limit, offset=offset, + msg_type_filter=type_filter, + ) + + if fmt == 'json': + output({ + 'chat': chat_ctx['display_name'], + 'username': chat_ctx['username'], + 'is_group': chat_ctx['is_group'], + 'count': len(lines), + 'offset': offset, + 'limit': limit, + 'start_time': start_time or None, + 'end_time': end_time or None, + 'type': msg_type or None, + 'messages': lines, + 'failures': failures if failures else None, + }, 'json') + else: + header = f"{chat_ctx['display_name']} 的消息记录(返回 {len(lines)} 条,offset={offset}, limit={limit})" + if chat_ctx['is_group']: + header += " [群聊]" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + if lines: + output(header + ":\n\n" + "\n".join(lines), 'text') + else: + output(f"{chat_ctx['display_name']} 无消息记录", 'text') diff --git a/wechat_cli/commands/init.py b/wechat_cli/commands/init.py new file mode 100644 index 0000000..5396a4f --- /dev/null +++ b/wechat_cli/commands/init.py @@ -0,0 +1,72 @@ +"""init 命令 — 交互式初始化,提取密钥并生成配置""" + +import json +import os +import sys + +import click + +from ..core.config import STATE_DIR, CONFIG_FILE, KEYS_FILE, auto_detect_db_dir + + +@click.command() +@click.option("--db-dir", default=None, help="微信数据目录路径(默认自动检测)") +@click.option("--force", is_flag=True, help="强制重新提取密钥") +def init(db_dir, force): + """初始化 wechat-cli:提取密钥并生成配置""" + click.echo("WeChat CLI 初始化") + click.echo("=" * 40) + + # 1. 检查是否已初始化 + if os.path.exists(CONFIG_FILE) and os.path.exists(KEYS_FILE) and not force: + click.echo(f"已初始化(配置: {CONFIG_FILE})") + click.echo("使用 --force 重新提取密钥") + return + + # 2. 创建状态目录 + os.makedirs(STATE_DIR, exist_ok=True) + + # 3. 确定 db_dir + if db_dir is None: + db_dir = auto_detect_db_dir() + if db_dir is None: + click.echo("[!] 未能自动检测到微信数据目录", err=True) + click.echo("请通过 --db-dir 参数指定,例如:", err=True) + click.echo(" wechat-cli init --db-dir ~/path/to/db_storage", err=True) + sys.exit(1) + click.echo(f"[+] 检测到微信数据目录: {db_dir}") + else: + db_dir = os.path.abspath(db_dir) + if not os.path.isdir(db_dir): + click.echo(f"[!] 目录不存在: {db_dir}", err=True) + sys.exit(1) + click.echo(f"[+] 使用指定数据目录: {db_dir}") + + # 4. 提取密钥 + click.echo("\n开始提取密钥...") + try: + from ..keys import extract_keys + key_map = extract_keys(db_dir, KEYS_FILE) + except RuntimeError as e: + click.echo(f"\n[!] 密钥提取失败: {e}", err=True) + if "sudo" not in str(e).lower(): + click.echo("提示: macOS/Linux 可能需要 sudo 权限", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"\n[!] 密钥提取出错: {e}", err=True) + sys.exit(1) + + # 5. 写入配置 + cfg = { + "db_dir": db_dir, + } + with open(CONFIG_FILE, "w", encoding="utf-8") as f: + json.dump(cfg, f, indent=2, ensure_ascii=False) + + click.echo(f"\n[+] 初始化完成!") + click.echo(f" 配置: {CONFIG_FILE}") + click.echo(f" 密钥: {KEYS_FILE}") + click.echo(f" 提取到 {len(key_map)} 个数据库密钥") + click.echo("\n现在可以使用:") + click.echo(" wechat-cli sessions") + click.echo(" wechat-cli history \"联系人\"") diff --git a/wechat_cli/commands/members.py b/wechat_cli/commands/members.py new file mode 100644 index 0000000..921b6c6 --- /dev/null +++ b/wechat_cli/commands/members.py @@ -0,0 +1,52 @@ +"""members 命令 — 查询群聊成员列表""" + +import click + +from ..core.contacts import get_contact_names, resolve_username, get_group_members +from ..output.formatter import output + + +@click.command("members") +@click.argument("group_name") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def members(ctx, group_name, fmt): + """查询群聊成员列表 + + \b + 示例: + wechat-cli members "AI交流群" + wechat-cli members "群名" --format text + """ + app = ctx.obj + + username = resolve_username(group_name, app.cache, app.decrypted_dir) + if not username: + click.echo(f"找不到: {group_name}", err=True) + ctx.exit(1) + + if '@chatroom' not in username: + click.echo(f"{group_name} 不是一个群聊", err=True) + ctx.exit(1) + + names = get_contact_names(app.cache, app.decrypted_dir) + display_name = names.get(username, username) + + result = get_group_members(username, app.cache, app.decrypted_dir) + + if fmt == 'json': + output({ + 'group': display_name, + 'username': username, + 'member_count': len(result['members']), + 'owner': result['owner'], + 'members': result['members'], + }, 'json') + else: + lines = [f"{m['display_name']} ({m['username']})"] + if m['remark']: + lines[-1] += f" 备注: {m['remark']}" + header = f"{display_name} 的群成员(共 {len(result['members'])} 人)" + if result['owner']: + header += f",群主: {result['owner']}" + output(header + ":\n\n" + "\n".join(lines), 'text') diff --git a/wechat_cli/commands/new_messages.py b/wechat_cli/commands/new_messages.py new file mode 100644 index 0000000..13d1783 --- /dev/null +++ b/wechat_cli/commands/new_messages.py @@ -0,0 +1,163 @@ +"""get-new-messages 命令 — 增量消息查询,状态持久化到磁盘""" + +import json +import os +import sqlite3 +from contextlib import closing +from datetime import datetime + +import click + +from ..core.config import STATE_DIR +from ..core.contacts import get_contact_names +from ..core.messages import decompress_content, format_msg_type +from ..output.formatter import output + +STATE_FILE = os.path.join(STATE_DIR, "last_check.json") + + +def _load_last_state(): + if not os.path.exists(STATE_FILE): + return {} + try: + with open(STATE_FILE, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + return {} + + +def _save_last_state(state): + os.makedirs(STATE_DIR, exist_ok=True) + with open(STATE_FILE, 'w', encoding="utf-8") as f: + json.dump(state, f) + + +@click.command("new-messages") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def new_messages(ctx, fmt): + """获取自上次调用以来的新消息 + + \b + 示例: + wechat-cli new-messages # 首次: 返回未读消息并记录状态 + wechat-cli new-messages # 再次: 仅返回新增消息 + wechat-cli new-messages --format text # 纯文本输出 + \b + 状态文件: ~/.wechat-cli/last_check.json (删除此文件可重置) + """ + app = ctx.obj + + path = app.cache.get(os.path.join("session", "session.db")) + if not path: + click.echo("错误: 无法解密 session.db", err=True) + ctx.exit(3) + + names = get_contact_names(app.cache, app.decrypted_dir) + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + """).fetchall() + + curr_state = {} + for r in rows: + username, unread, summary, ts, msg_type, sender, sender_name = r + curr_state[username] = { + 'unread': unread, 'summary': summary, 'timestamp': ts, + 'msg_type': msg_type, 'sender': sender or '', 'sender_name': sender_name or '', + } + + last_state = _load_last_state() + + if not last_state: + # 首次调用:保存状态,返回未读 + _save_last_state({u: s['timestamp'] for u, s in curr_state.items()}) + + unread_msgs = [] + for username, s in curr_state.items(): + if s['unread'] and s['unread'] > 0: + display = names.get(username, username) + is_group = '@chatroom' in username + summary = s['summary'] + if isinstance(summary, bytes): + summary = decompress_content(summary, 4) or '(压缩内容)' + if isinstance(summary, str) and ':\n' in summary: + summary = summary.split(':\n', 1)[1] + time_str = datetime.fromtimestamp(s['timestamp']).strftime('%H:%M') + unread_msgs.append({ + 'chat': display, + 'username': username, + 'is_group': is_group, + 'unread': s['unread'], + 'last_message': str(summary or ''), + 'msg_type': format_msg_type(s['msg_type']), + 'time': time_str, + 'timestamp': s['timestamp'], + }) + + if fmt == 'json': + output({'first_call': True, 'unread_count': len(unread_msgs), 'messages': unread_msgs}, 'json') + else: + if unread_msgs: + lines = [] + for m in unread_msgs: + tag = " [群]" if m['is_group'] else "" + lines.append(f"[{m['time']}] {m['chat']}{tag} ({m['unread']}条未读): {m['last_message']}") + output(f"当前 {len(unread_msgs)} 个未读会话:\n\n" + "\n".join(lines), 'text') + else: + output("当前无未读消息(已记录状态,下次调用将返回新消息)", 'text') + return + + # 后续调用:对比差异 + new_msgs = [] + for username, s in curr_state.items(): + prev_ts = last_state.get(username, 0) + if s['timestamp'] > prev_ts: + display = names.get(username, username) + is_group = '@chatroom' in username + summary = s['summary'] + if isinstance(summary, bytes): + summary = decompress_content(summary, 4) or '(压缩内容)' + if isinstance(summary, str) and ':\n' in summary: + summary = summary.split(':\n', 1)[1] + + sender_display = '' + if is_group and s['sender']: + sender_display = names.get(s['sender'], s['sender_name'] or s['sender']) + + new_msgs.append({ + 'chat': display, + 'username': username, + 'is_group': is_group, + 'last_message': str(summary or ''), + 'msg_type': format_msg_type(s['msg_type']), + 'sender': sender_display, + 'time': datetime.fromtimestamp(s['timestamp']).strftime('%H:%M:%S'), + 'timestamp': s['timestamp'], + }) + + _save_last_state({u: s['timestamp'] for u, s in curr_state.items()}) + + new_msgs.sort(key=lambda m: m['timestamp']) + + if fmt == 'json': + output({'first_call': False, 'new_count': len(new_msgs), 'messages': new_msgs}, 'json') + else: + if not new_msgs: + output("无新消息", 'text') + else: + lines = [] + for m in new_msgs: + entry = f"[{m['time']}] {m['chat']}" + if m['is_group']: + entry += " [群]" + entry += f": {m['msg_type']}" + if m['sender']: + entry += f" ({m['sender']})" + entry += f" - {m['last_message']}" + lines.append(entry) + output(f"{len(new_msgs)} 条新消息:\n\n" + "\n".join(lines), 'text') diff --git a/wechat_cli/commands/search.py b/wechat_cli/commands/search.py new file mode 100644 index 0000000..47f54f8 --- /dev/null +++ b/wechat_cli/commands/search.py @@ -0,0 +1,124 @@ +"""search-messages 命令""" + +import click + +from ..core.contacts import get_contact_names +from ..core.messages import ( + MSG_TYPE_FILTERS, + MSG_TYPE_NAMES, + collect_chat_search, + parse_time_range, + resolve_chat_context, + resolve_chat_contexts, + search_all_messages, + validate_pagination, + _candidate_page_size, + _page_ranked_entries, +) +from ..output.formatter import output + + +@click.command("search") +@click.argument("keyword") +@click.option("--chat", multiple=True, help="限定聊天对象(可多次指定)") +@click.option("--start-time", default="", help="起始时间") +@click.option("--end-time", default="", help="结束时间") +@click.option("--limit", default=20, help="返回数量(最大500)") +@click.option("--offset", default=0, help="分页偏移量") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤") +@click.pass_context +def search(ctx, keyword, chat, start_time, end_time, limit, offset, fmt, msg_type): + """搜索消息内容 + + \b + 示例: + wechat-cli search "Claude" # 全局搜索 + wechat-cli search "Claude" --chat "AI交流群" # 在指定群搜索 + wechat-cli search "开会" --chat "群A" --chat "群B" # 同时搜多个群 + wechat-cli search "你好" --start-time "2026-04-01" --limit 50 + """ + app = ctx.obj + + try: + validate_pagination(limit, offset) + start_ts, end_ts = parse_time_range(start_time, end_time) + except ValueError as e: + click.echo(f"错误: {e}", err=True) + ctx.exit(2) + + names = get_contact_names(app.cache, app.decrypted_dir) + candidate_limit = _candidate_page_size(limit, offset) + chat_names = list(chat) + type_filter = MSG_TYPE_FILTERS[msg_type] if msg_type else None + + if len(chat_names) == 1: + # 单聊搜索 + chat_ctx = resolve_chat_context(chat_names[0], app.msg_db_keys, app.cache, app.decrypted_dir) + if not chat_ctx: + click.echo(f"找不到聊天对象: {chat_names[0]}", err=True) + ctx.exit(1) + if not chat_ctx['db_path']: + click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True) + ctx.exit(1) + entries, failures = collect_chat_search( + chat_ctx, names, keyword, app.display_name_fn, + start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit, + msg_type_filter=type_filter, + ) + scope = chat_ctx['display_name'] + + elif len(chat_names) > 1: + # 多聊搜索 + resolved, unresolved, missing = resolve_chat_contexts(chat_names, app.msg_db_keys, app.cache, app.decrypted_dir) + if not resolved: + click.echo("错误: 没有可查询的聊天对象", err=True) + ctx.exit(1) + entries = [] + failures = [] + for rc in resolved: + e, f = collect_chat_search( + rc, names, keyword, app.display_name_fn, + start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit, + msg_type_filter=type_filter, + ) + entries.extend(e) + failures.extend(f) + if unresolved: + failures.append("未找到: " + "、".join(unresolved)) + scope = f"{len(resolved)} 个聊天对象" + + else: + # 全局搜索 + entries, failures = search_all_messages( + app.msg_db_keys, app.cache, names, keyword, app.display_name_fn, + start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit, + msg_type_filter=type_filter, + ) + scope = "全部消息" + + paged = _page_ranked_entries(entries, limit, offset) + + if fmt == 'json': + output({ + 'scope': scope, + 'keyword': keyword, + 'count': len(paged), + 'offset': offset, + 'limit': limit, + 'start_time': start_time or None, + 'end_time': end_time or None, + 'type': msg_type or None, + 'results': [item[1] for item in paged], + 'failures': failures if failures else None, + }, 'json') + else: + if not paged: + output(f"在 {scope} 中未找到包含 \"{keyword}\" 的消息", 'text') + return + header = f"在 {scope} 中搜索 \"{keyword}\" 找到 {len(paged)} 条结果(offset={offset}, limit={limit})" + if start_time or end_time: + header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}" + if failures: + header += "\n查询失败: " + ";".join(failures) + output(header + ":\n\n" + "\n\n".join(item[1] for item in paged), 'text') diff --git a/wechat_cli/commands/sessions.py b/wechat_cli/commands/sessions.py new file mode 100644 index 0000000..966a87e --- /dev/null +++ b/wechat_cli/commands/sessions.py @@ -0,0 +1,88 @@ +"""get-recent-sessions 命令""" + +import os +import sqlite3 +from contextlib import closing +from datetime import datetime + +import click + +from ..core.contacts import get_contact_names +from ..core.messages import decompress_content, format_msg_type +from ..output.formatter import output + + +@click.command("sessions") +@click.option("--limit", default=20, help="返回的会话数量") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def sessions(ctx, limit, fmt): + """获取最近会话列表 + + \b + 示例: + wechat-cli sessions # 默认返回最近 20 个会话 (JSON) + wechat-cli sessions --limit 10 # 最近 10 个会话 + wechat-cli sessions --format text # 纯文本输出 + """ + app = ctx.obj + + path = app.cache.get(os.path.join("session", "session.db")) + if not path: + click.echo("错误: 无法解密 session.db", err=True) + ctx.exit(3) + + names = get_contact_names(app.cache, app.decrypted_dir) + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE last_timestamp > 0 + ORDER BY last_timestamp DESC + LIMIT ? + """, (limit,)).fetchall() + + results = [] + for r in rows: + username, unread, summary, ts, msg_type, sender, sender_name = r + display = names.get(username, username) + is_group = '@chatroom' in username + + if isinstance(summary, bytes): + summary = decompress_content(summary, 4) or '(压缩内容)' + if isinstance(summary, str) and ':\n' in summary: + summary = summary.split(':\n', 1)[1] + + sender_display = '' + if is_group and sender: + sender_display = names.get(sender, sender_name or sender) + + results.append({ + 'chat': display, + 'username': username, + 'is_group': is_group, + 'unread': unread or 0, + 'last_message': str(summary or ''), + 'msg_type': format_msg_type(msg_type), + 'sender': sender_display, + 'timestamp': ts, + 'time': datetime.fromtimestamp(ts).strftime('%m-%d %H:%M'), + }) + + if fmt == 'json': + output(results, 'json') + else: + lines = [] + for r in results: + entry = f"[{r['time']}] {r['chat']}" + if r['is_group']: + entry += " [群]" + if r['unread'] > 0: + entry += f" ({r['unread']}条未读)" + entry += f"\n {r['msg_type']}: " + if r['sender']: + entry += f"{r['sender']}: " + entry += r['last_message'] + lines.append(entry) + output(f"最近 {len(results)} 个会话:\n\n" + "\n\n".join(lines), 'text') diff --git a/wechat_cli/commands/stats.py b/wechat_cli/commands/stats.py new file mode 100644 index 0000000..8dd9190 --- /dev/null +++ b/wechat_cli/commands/stats.py @@ -0,0 +1,88 @@ +"""stats 命令 — 聊天统计分析""" + +import click + +from ..core.contacts import get_contact_names +from ..core.messages import ( + collect_chat_stats, + parse_time_range, + resolve_chat_context, +) +from ..output.formatter import output + + +@click.command("stats") +@click.argument("chat_name") +@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]") +@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def stats(ctx, chat_name, start_time, end_time, fmt): + """聊天统计分析 + + \b + 示例: + wechat-cli stats "AI交流群" + wechat-cli stats "张三" --start-time "2026-04-01" --end-time "2026-04-03" + wechat-cli stats "群名" --format text + """ + app = ctx.obj + + try: + start_ts, end_ts = parse_time_range(start_time, end_time) + except ValueError as e: + click.echo(f"错误: {e}", err=True) + ctx.exit(2) + + chat_ctx = resolve_chat_context(chat_name, app.msg_db_keys, app.cache, app.decrypted_dir) + if not chat_ctx: + click.echo(f"找不到聊天对象: {chat_name}", err=True) + ctx.exit(1) + if not chat_ctx['db_path']: + click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True) + ctx.exit(1) + + names = get_contact_names(app.cache, app.decrypted_dir) + result = collect_chat_stats( + chat_ctx, names, app.display_name_fn, + start_ts=start_ts, end_ts=end_ts, + ) + + if fmt == 'json': + output({ + 'chat': chat_ctx['display_name'], + 'username': chat_ctx['username'], + 'is_group': chat_ctx['is_group'], + **result, + }, 'json') + else: + lines = [f"{chat_ctx['display_name']} 聊天统计"] + if chat_ctx['is_group']: + lines[0] += " [群聊]" + lines.append(f"消息总数: {result['total']}") + if start_time or end_time: + lines.append(f"时间范围: {start_time or '最早'} ~ {end_time or '最新'}") + + # 类型分布 + lines.append("\n消息类型分布:") + for t, cnt in result['type_breakdown'].items(): + pct = cnt / result['total'] * 100 if result['total'] > 0 else 0 + lines.append(f" {t}: {cnt} ({pct:.1f}%)") + + # 发送者排名 + if result['top_senders']: + lines.append("\n发言排行 Top 10:") + for s in result['top_senders']: + lines.append(f" {s['name']}: {s['count']}") + + # 24小时分布 + lines.append("\n24小时活跃分布:") + max_count = max(result['hourly'].values()) if result['hourly'] else 0 + bar_max = 30 + for h in range(24): + count = result['hourly'].get(h, 0) + bar_len = int(count / max_count * bar_max) if max_count > 0 else 0 + bar = '█' * bar_len + lines.append(f" {h:02d}时 |{bar} {count}") + + output("\n".join(lines), 'text') diff --git a/wechat_cli/commands/unread.py b/wechat_cli/commands/unread.py new file mode 100644 index 0000000..3afbf9c --- /dev/null +++ b/wechat_cli/commands/unread.py @@ -0,0 +1,90 @@ +"""unread 命令 — 查看未读会话""" + +import os +import sqlite3 +from contextlib import closing +from datetime import datetime + +import click + +from ..core.contacts import get_contact_names +from ..core.messages import decompress_content, format_msg_type +from ..output.formatter import output + + +@click.command("unread") +@click.option("--limit", default=50, help="返回的会话数量") +@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式") +@click.pass_context +def unread(ctx, limit, fmt): + """查看未读会话 + + \b + 示例: + wechat-cli unread # 查看所有未读会话 + wechat-cli unread --limit 10 # 最多显示 10 个 + wechat-cli unread --format text # 纯文本输出 + """ + app = ctx.obj + + path = app.cache.get(os.path.join("session", "session.db")) + if not path: + click.echo("错误: 无法解密 session.db", err=True) + ctx.exit(3) + + names = get_contact_names(app.cache, app.decrypted_dir) + with closing(sqlite3.connect(path)) as conn: + rows = conn.execute(""" + SELECT username, unread_count, summary, last_timestamp, + last_msg_type, last_msg_sender, last_sender_display_name + FROM SessionTable + WHERE unread_count > 0 + ORDER BY last_timestamp DESC + LIMIT ? + """, (limit,)).fetchall() + + results = [] + for r in rows: + username, unread, summary, ts, msg_type, sender, sender_name = r + display = names.get(username, username) + is_group = '@chatroom' in username + + if isinstance(summary, bytes): + summary = decompress_content(summary, 4) or '(压缩内容)' + if isinstance(summary, str) and ':\n' in summary: + summary = summary.split(':\n', 1)[1] + + sender_display = '' + if is_group and sender: + sender_display = names.get(sender, sender_name or sender) + + results.append({ + 'chat': display, + 'username': username, + 'is_group': is_group, + 'unread': unread or 0, + 'last_message': str(summary or ''), + 'msg_type': format_msg_type(msg_type), + 'sender': sender_display, + 'timestamp': ts, + 'time': datetime.fromtimestamp(ts).strftime('%m-%d %H:%M'), + }) + + if fmt == 'json': + output(results, 'json') + else: + if not results: + output("没有未读消息", 'text') + return + lines = [] + for r in results: + entry = f"[{r['time']}] {r['chat']}" + if r['is_group']: + entry += " [群]" + entry += f" ({r['unread']}条未读)" + entry += f"\n {r['msg_type']}: " + if r['sender']: + entry += f"{r['sender']}: " + entry += r['last_message'] + lines.append(entry) + output(f"未读会话({len(results)} 个):\n\n" + "\n\n".join(lines), 'text') diff --git a/wechat_cli/core/__init__.py b/wechat_cli/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_cli/core/config.py b/wechat_cli/core/config.py new file mode 100644 index 0000000..0294d2b --- /dev/null +++ b/wechat_cli/core/config.py @@ -0,0 +1,192 @@ +"""配置加载 — 从 ~/.wechat-cli/ 读取自包含配置""" + +import glob as glob_mod +import json +import os +import platform +import sys + +_SYSTEM = platform.system().lower() + +if _SYSTEM == "linux": + _DEFAULT_PROCESS = "wechat" +elif _SYSTEM == "darwin": + _DEFAULT_PROCESS = "WeChat" +else: + _DEFAULT_PROCESS = "Weixin.exe" + +# CLI 状态目录 +STATE_DIR = os.path.expanduser("~/.wechat-cli") +CONFIG_FILE = os.path.join(STATE_DIR, "config.json") +KEYS_FILE = os.path.join(STATE_DIR, "all_keys.json") + + +def _choose_candidate(candidates): + if len(candidates) == 1: + return candidates[0] + if len(candidates) > 1: + if not sys.stdin.isatty(): + return candidates[0] + print("[!] 检测到多个微信数据目录:") + for i, c in enumerate(candidates, 1): + print(f" {i}. {c}") + print(" 0. 跳过") + try: + while True: + choice = input(f"请选择 [0-{len(candidates)}]: ").strip() + if choice == "0": + return None + if choice.isdigit() and 1 <= int(choice) <= len(candidates): + return candidates[int(choice) - 1] + print(" 无效输入") + except (EOFError, KeyboardInterrupt): + print() + return None + return None + + +def _auto_detect_db_dir_windows(): + appdata = os.environ.get("APPDATA", "") + config_dir = os.path.join(appdata, "Tencent", "xwechat", "config") + if not os.path.isdir(config_dir): + return None + data_roots = [] + for ini_file in glob_mod.glob(os.path.join(config_dir, "*.ini")): + try: + content = None + for enc in ("utf-8", "gbk"): + try: + with open(ini_file, "r", encoding=enc) as f: + content = f.read(1024).strip() + break + except UnicodeDecodeError: + continue + if not content or any(c in content for c in "\n\r\x00"): + continue + if os.path.isdir(content): + data_roots.append(content) + except OSError: + continue + seen = set() + candidates = [] + for root in data_roots: + pattern = os.path.join(root, "xwechat_files", "*", "db_storage") + for match in glob_mod.glob(pattern): + normalized = os.path.normcase(os.path.normpath(match)) + if os.path.isdir(match) and normalized not in seen: + seen.add(normalized) + candidates.append(match) + return _choose_candidate(candidates) + + +def _auto_detect_db_dir_linux(): + seen = set() + candidates = [] + search_roots = [os.path.expanduser("~/Documents/xwechat_files")] + sudo_user = os.environ.get("SUDO_USER") + if sudo_user: + import pwd + try: + sudo_home = pwd.getpwnam(sudo_user).pw_dir + except KeyError: + sudo_home = None + if sudo_home: + fallback = os.path.join(sudo_home, "Documents", "xwechat_files") + if fallback not in search_roots: + search_roots.append(fallback) + for root in search_roots: + if not os.path.isdir(root): + continue + pattern = os.path.join(root, "*", "db_storage") + for match in glob_mod.glob(pattern): + normalized = os.path.normcase(os.path.normpath(match)) + if os.path.isdir(match) and normalized not in seen: + seen.add(normalized) + candidates.append(match) + old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage") + if os.path.isdir(old_path): + normalized = os.path.normcase(os.path.normpath(old_path)) + if normalized not in seen: + candidates.append(old_path) + + def _mtime(path): + msg_dir = os.path.join(path, "message") + target = msg_dir if os.path.isdir(msg_dir) else path + try: + return os.path.getmtime(target) + except OSError: + return 0 + candidates.sort(key=_mtime, reverse=True) + return _choose_candidate(candidates) + + +def _auto_detect_db_dir_macos(): + base = os.path.expanduser("~/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files") + if not os.path.isdir(base): + return None + seen = set() + candidates = [] + pattern = os.path.join(base, "*", "db_storage") + for match in glob_mod.glob(pattern): + normalized = os.path.normcase(os.path.normpath(match)) + if os.path.isdir(match) and normalized not in seen: + seen.add(normalized) + candidates.append(match) + return _choose_candidate(candidates) + + +def auto_detect_db_dir(): + if _SYSTEM == "windows": + return _auto_detect_db_dir_windows() + if _SYSTEM == "linux": + return _auto_detect_db_dir_linux() + if _SYSTEM == "darwin": + return _auto_detect_db_dir_macos() + return None + + +def load_config(config_path=None): + """加载配置。默认从 ~/.wechat-cli/config.json 读取。""" + if config_path is None: + config_path = CONFIG_FILE + + cfg = {} + if os.path.exists(config_path): + try: + with open(config_path, encoding="utf-8") as f: + cfg = json.load(f) + except json.JSONDecodeError: + cfg = {} + + # db_dir 缺失时,自动检测 + db_dir = cfg.get("db_dir", "") + if not db_dir: + detected = auto_detect_db_dir() + if detected: + cfg["db_dir"] = detected + else: + raise FileNotFoundError( + "未找到微信数据目录。\n" + "请运行: wechat-cli init" + ) + + # 设置默认值 + state_dir = os.path.dirname(os.path.abspath(config_path)) + cfg.setdefault("keys_file", os.path.join(state_dir, "all_keys.json")) + cfg.setdefault("decrypted_dir", os.path.join(state_dir, "decrypted")) + cfg.setdefault("decoded_image_dir", os.path.join(state_dir, "decoded_images")) + cfg.setdefault("wechat_process", _DEFAULT_PROCESS) + + # 所有路径确保为绝对路径 + for key in ("db_dir", "keys_file", "decrypted_dir", "decoded_image_dir"): + if key in cfg and not os.path.isabs(cfg[key]): + cfg[key] = os.path.join(state_dir, cfg[key]) + + # 推导微信数据根目录 + db_dir = cfg.get("db_dir", "") + if db_dir and os.path.basename(db_dir) == "db_storage": + cfg["wechat_base_dir"] = os.path.dirname(db_dir) + else: + cfg["wechat_base_dir"] = db_dir + + return cfg diff --git a/wechat_cli/core/contacts.py b/wechat_cli/core/contacts.py new file mode 100644 index 0000000..ed1cd51 --- /dev/null +++ b/wechat_cli/core/contacts.py @@ -0,0 +1,201 @@ +"""联系人管理 — 加载、缓存、模糊匹配""" + +import os +import re +import sqlite3 + + +_contact_names = None # {username: display_name} +_contact_full = None # [{username, nick_name, remark}] +_self_username = None + + +def _load_contacts_from(db_path): + names = {} + full = [] + conn = sqlite3.connect(db_path) + try: + for r in conn.execute("SELECT username, nick_name, remark FROM contact").fetchall(): + uname, nick, remark = r + display = remark if remark else nick if nick else uname + names[uname] = display + full.append({'username': uname, 'nick_name': nick or '', 'remark': remark or ''}) + finally: + conn.close() + return names, full + + +def get_contact_names(cache, decrypted_dir): + global _contact_names, _contact_full + if _contact_names is not None: + return _contact_names + + pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db") + if os.path.exists(pre_decrypted): + try: + _contact_names, _contact_full = _load_contacts_from(pre_decrypted) + return _contact_names + except Exception: + pass + + path = cache.get(os.path.join("contact", "contact.db")) + if path: + try: + _contact_names, _contact_full = _load_contacts_from(path) + return _contact_names + except Exception: + pass + + return {} + + +def get_contact_full(cache, decrypted_dir): + global _contact_full + if _contact_full is None: + get_contact_names(cache, decrypted_dir) + return _contact_full or [] + + +def resolve_username(chat_name, cache, decrypted_dir): + names = get_contact_names(cache, decrypted_dir) + if chat_name in names or chat_name.startswith('wxid_') or '@chatroom' in chat_name: + return chat_name + chat_lower = chat_name.lower() + for uname, display in names.items(): + if chat_lower == display.lower(): + return uname + for uname, display in names.items(): + if chat_lower in display.lower(): + return uname + return None + + +def get_self_username(db_dir, cache, decrypted_dir): + global _self_username + if _self_username: + return _self_username + if not db_dir: + return '' + names = get_contact_names(cache, decrypted_dir) + account_dir = os.path.basename(os.path.dirname(db_dir)) + candidates = [account_dir] + m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir) + if m: + candidates.insert(0, m.group(1)) + for candidate in candidates: + if candidate and candidate in names: + _self_username = candidate + return _self_username + return '' + + +def get_group_members(chatroom_username, cache, decrypted_dir): + """获取群聊成员列表。 + + 通过 contact.db 的 chatroom_member 关联表查询。 + + Returns: + dict: {'members': [...], 'owner': str} + 每个 member: {'username': ..., 'nick_name': ..., 'remark': ..., 'display_name': ...} + """ + pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db") + if os.path.exists(pre_decrypted): + db_path = pre_decrypted + else: + db_path = cache.get(os.path.join("contact", "contact.db")) + + if not db_path: + return {'members': [], 'owner': ''} + + names = get_contact_names(cache, decrypted_dir) + conn = sqlite3.connect(db_path) + try: + # 1. 找到 chatroom 的 contact.id + row = conn.execute("SELECT id FROM contact WHERE username = ?", (chatroom_username,)).fetchone() + if not row: + return {'members': [], 'owner': ''} + room_id = row[0] + + # 2. 获取群主 + owner = '' + owner_row = conn.execute("SELECT owner FROM chat_room WHERE id = ?", (room_id,)).fetchone() + if owner_row and owner_row[0]: + owner = names.get(owner_row[0], owner_row[0]) + + # 3. 获取成员 ID 列表 + member_ids = [r[0] for r in conn.execute( + "SELECT member_id FROM chatroom_member WHERE room_id = ?", (room_id,) + ).fetchall()] + if not member_ids: + return {'members': [], 'owner': owner} + + # 4. 批量查询成员信息 + placeholders = ','.join('?' * len(member_ids)) + members = [] + for uid, username, nick, remark in conn.execute( + f"SELECT id, username, nick_name, remark FROM contact WHERE id IN ({placeholders})", + member_ids + ): + display = remark if remark else nick if nick else username + members.append({ + 'username': username, + 'nick_name': nick or '', + 'remark': remark or '', + 'display_name': display, + }) + + # 按 display_name 排序,群主排最前 + members.sort(key=lambda m: (0 if m['username'] == (owner_row[0] if owner_row else '') else 1, m['display_name'])) + + return {'members': members, 'owner': owner} + finally: + conn.close() + + +def get_contact_detail(username, cache, decrypted_dir): + """获取联系人详情。 + + Returns: + dict or None: 联系人详细信息 + """ + pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db") + if os.path.exists(pre_decrypted): + db_path = pre_decrypted + else: + db_path = cache.get(os.path.join("contact", "contact.db")) + if not db_path: + return None + + conn = sqlite3.connect(db_path) + try: + row = conn.execute( + "SELECT username, nick_name, remark, alias, description, " + "small_head_url, big_head_url, verify_flag, local_type " + "FROM contact WHERE username = ?", + (username,) + ).fetchone() + if not row: + return None + uname, nick, remark, alias, desc, small_url, big_url, verify, ltype = row + return { + 'username': uname, + 'nick_name': nick or '', + 'remark': remark or '', + 'alias': alias or '', + 'description': desc or '', + 'avatar': small_url or big_url or '', + 'verify_flag': verify or 0, + 'local_type': ltype, + 'is_group': '@chatroom' in uname, + 'is_subscription': uname.startswith('gh_'), + } + finally: + conn.close() + + +def display_name_for_username(username, names, db_dir, cache, decrypted_dir): + if not username: + return '' + if username == get_self_username(db_dir, cache, decrypted_dir): + return 'me' + return names.get(username, username) diff --git a/wechat_cli/core/context.py b/wechat_cli/core/context.py new file mode 100644 index 0000000..f8cc412 --- /dev/null +++ b/wechat_cli/core/context.py @@ -0,0 +1,41 @@ +"""应用上下文 — 单例持有配置、缓存、密钥等共享状态""" + +import atexit +import json +import os + +from .config import load_config, STATE_DIR +from .db_cache import DBCache +from .key_utils import strip_key_metadata +from .messages import find_msg_db_keys + + +class AppContext: + """每次 CLI 调用初始化一次,被所有命令共享。""" + + def __init__(self, config_path=None): + self.cfg = load_config(config_path) + self.db_dir = self.cfg["db_dir"] + self.decrypted_dir = self.cfg["decrypted_dir"] + self.keys_file = self.cfg["keys_file"] + + if not os.path.exists(self.keys_file): + raise FileNotFoundError( + f"密钥文件不存在: {self.keys_file}\n" + "请运行: wechat-cli init" + ) + + with open(self.keys_file, encoding="utf-8") as f: + self.all_keys = strip_key_metadata(json.load(f)) + + self.cache = DBCache(self.all_keys, self.db_dir) + atexit.register(self.cache.cleanup) + + self.msg_db_keys = find_msg_db_keys(self.all_keys) + + # 确保状态目录存在 + os.makedirs(STATE_DIR, exist_ok=True) + + def display_name_fn(self, username, names): + from .contacts import display_name_for_username + return display_name_for_username(username, names, self.db_dir, self.cache, self.decrypted_dir) diff --git a/wechat_cli/core/crypto.py b/wechat_cli/core/crypto.py new file mode 100644 index 0000000..0285dc1 --- /dev/null +++ b/wechat_cli/core/crypto.py @@ -0,0 +1,77 @@ +"""数据库解密 — SQLCipher 4, AES-256-CBC""" + +import os +import struct + +from Crypto.Cipher import AES + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 +RESERVE_SZ = 80 # IV(16) + HMAC-SHA512(64) +SQLITE_HDR = b'SQLite format 3\x00' +WAL_HEADER_SZ = 32 +WAL_FRAME_HEADER_SZ = 24 + + +def decrypt_page(enc_key, page_data, pgno): + iv = page_data[PAGE_SZ - RESERVE_SZ: PAGE_SZ - RESERVE_SZ + 16] + if pgno == 1: + encrypted = page_data[SALT_SZ: PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return bytes(bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ)) + else: + encrypted = page_data[:PAGE_SZ - RESERVE_SZ] + cipher = AES.new(enc_key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(encrypted) + return decrypted + b'\x00' * RESERVE_SZ + + +def full_decrypt(db_path, out_path, enc_key): + file_size = os.path.getsize(db_path) + total_pages = file_size // PAGE_SZ + os.makedirs(os.path.dirname(out_path), exist_ok=True) + with open(db_path, 'rb') as fin, open(out_path, 'wb') as fout: + for pgno in range(1, total_pages + 1): + page = fin.read(PAGE_SZ) + if len(page) < PAGE_SZ: + if len(page) > 0: + page = page + b'\x00' * (PAGE_SZ - len(page)) + else: + break + fout.write(decrypt_page(enc_key, page, pgno)) + return total_pages + + +def decrypt_wal(wal_path, out_path, enc_key): + if not os.path.exists(wal_path): + return 0 + wal_size = os.path.getsize(wal_path) + if wal_size <= WAL_HEADER_SZ: + return 0 + patched = 0 + with open(wal_path, 'rb') as wf, open(out_path, 'r+b') as df: + wal_hdr = wf.read(WAL_HEADER_SZ) + wal_salt1 = struct.unpack('>I', wal_hdr[16:20])[0] + wal_salt2 = struct.unpack('>I', wal_hdr[20:24])[0] + frame_size = WAL_FRAME_HEADER_SZ + PAGE_SZ + while wf.tell() + frame_size <= wal_size: + fh = wf.read(WAL_FRAME_HEADER_SZ) + if len(fh) < WAL_FRAME_HEADER_SZ: + break + pgno = struct.unpack('>I', fh[0:4])[0] + frame_salt1 = struct.unpack('>I', fh[8:12])[0] + frame_salt2 = struct.unpack('>I', fh[12:16])[0] + ep = wf.read(PAGE_SZ) + if len(ep) < PAGE_SZ: + break + if pgno == 0 or pgno > 1000000: + continue + if frame_salt1 != wal_salt1 or frame_salt2 != wal_salt2: + continue + dec = decrypt_page(enc_key, ep, pgno) + df.seek((pgno - 1) * PAGE_SZ) + df.write(dec) + patched += 1 + return patched diff --git a/wechat_cli/core/db_cache.py b/wechat_cli/core/db_cache.py new file mode 100644 index 0000000..2cf5f64 --- /dev/null +++ b/wechat_cli/core/db_cache.py @@ -0,0 +1,91 @@ +"""解密数据库缓存 — mtime 检测变化,跨会话复用""" + +import hashlib +import json +import os +import tempfile + +from .crypto import full_decrypt, decrypt_wal +from .key_utils import get_key_info + + +class DBCache: + CACHE_DIR = os.path.join(tempfile.gettempdir(), "wechat_cli_cache") + MTIME_FILE = os.path.join(tempfile.gettempdir(), "wechat_cli_cache", "_mtimes.json") + + def __init__(self, all_keys, db_dir): + self._all_keys = all_keys + self._db_dir = db_dir + self._cache = {} # rel_key -> (db_mtime, wal_mtime, tmp_path) + os.makedirs(self.CACHE_DIR, exist_ok=True) + self._load_persistent_cache() + + def _cache_path(self, rel_key): + h = hashlib.md5(rel_key.encode()).hexdigest()[:12] + return os.path.join(self.CACHE_DIR, f"{h}.db") + + def _load_persistent_cache(self): + if not os.path.exists(self.MTIME_FILE): + return + try: + with open(self.MTIME_FILE, encoding="utf-8") as f: + saved = json.load(f) + except (json.JSONDecodeError, OSError): + return + for rel_key, info in saved.items(): + tmp_path = info["path"] + if not os.path.exists(tmp_path): + continue + rel_path = rel_key.replace('\\', os.sep) + db_path = os.path.join(self._db_dir, rel_path) + wal_path = db_path + "-wal" + try: + db_mtime = os.path.getmtime(db_path) + wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 + except OSError: + continue + if db_mtime == info["db_mt"] and wal_mtime == info["wal_mt"]: + self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path) + + def _save_persistent_cache(self): + data = {} + for rel_key, (db_mt, wal_mt, path) in self._cache.items(): + data[rel_key] = {"db_mt": db_mt, "wal_mt": wal_mt, "path": path} + try: + with open(self.MTIME_FILE, 'w', encoding="utf-8") as f: + json.dump(data, f) + except OSError: + pass + + def get(self, rel_key): + key_info = get_key_info(self._all_keys, rel_key) + if not key_info: + return None + rel_path = rel_key.replace('\\', '/').replace('/', os.sep) + db_path = os.path.join(self._db_dir, rel_path) + wal_path = db_path + "-wal" + if not os.path.exists(db_path): + return None + + try: + db_mtime = os.path.getmtime(db_path) + wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0 + except OSError: + return None + + if rel_key in self._cache: + c_db_mt, c_wal_mt, c_path = self._cache[rel_key] + if c_db_mt == db_mtime and c_wal_mt == wal_mtime and os.path.exists(c_path): + return c_path + + tmp_path = self._cache_path(rel_key) + enc_key = bytes.fromhex(key_info["enc_key"]) + full_decrypt(db_path, tmp_path, enc_key) + if os.path.exists(wal_path): + decrypt_wal(wal_path, tmp_path, enc_key) + self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path) + self._save_persistent_cache() + return tmp_path + + def cleanup(self): + self._save_persistent_cache() diff --git a/wechat_cli/core/key_utils.py b/wechat_cli/core/key_utils.py new file mode 100644 index 0000000..e3b8f31 --- /dev/null +++ b/wechat_cli/core/key_utils.py @@ -0,0 +1,36 @@ +"""密钥工具 — 路径匹配、元数据剥离""" + +import os +import posixpath + + +def strip_key_metadata(keys): + return {k: v for k, v in keys.items() if not k.startswith("_")} + + +def _is_safe_rel_path(path): + normalized = path.replace("\\", "/") + return ".." not in posixpath.normpath(normalized).split("/") + + +def key_path_variants(rel_path): + normalized = rel_path.replace("\\", "/") + variants = [] + for candidate in ( + rel_path, + normalized, + normalized.replace("/", "\\"), + normalized.replace("/", os.sep), + ): + if candidate not in variants: + variants.append(candidate) + return variants + + +def get_key_info(keys, rel_path): + if not _is_safe_rel_path(rel_path): + return None + for candidate in key_path_variants(rel_path): + if candidate in keys and not candidate.startswith("_"): + return keys[candidate] + return None diff --git a/wechat_cli/core/messages.py b/wechat_cli/core/messages.py new file mode 100644 index 0000000..f270fdf --- /dev/null +++ b/wechat_cli/core/messages.py @@ -0,0 +1,665 @@ +"""消息查询 — 分表查找、分页、格式化""" + +import hashlib +import os +import re +import sqlite3 +import xml.etree.ElementTree as ET +from contextlib import closing +from datetime import datetime + +import zstandard as zstd + +from .key_utils import key_path_variants + +_zstd_dctx = zstd.ZstdDecompressor() +_XML_UNSAFE_RE = re.compile(r' (base_type,) 或 (base_type, sub_type) +MSG_TYPE_FILTERS = { + 'text': (1,), + 'image': (3,), + 'voice': (34,), + 'video': (43,), + 'sticker': (47,), + 'location': (48,), + 'link': (49,), + 'file': (49, 6), + 'call': (50,), + 'system': (10000,), +} +MSG_TYPE_NAMES = list(MSG_TYPE_FILTERS.keys()) + + +# ---- 消息 DB 发现 ---- + +def find_msg_db_keys(all_keys): + return sorted([ + k for k in all_keys + if any(v.startswith("message/") for v in key_path_variants(k)) + and any(re.search(r"message_\d+\.db$", v) for v in key_path_variants(k)) + ]) + + +def _is_safe_msg_table_name(table_name): + return bool(re.fullmatch(r'Msg_[0-9a-f]{32}', table_name)) + + +def _find_msg_tables_for_user(username, msg_db_keys, cache): + table_hash = hashlib.md5(username.encode()).hexdigest() + table_name = f"Msg_{table_hash}" + if not _is_safe_msg_table_name(table_name): + return [] + matches = [] + for rel_key in msg_db_keys: + path = cache.get(rel_key) + if not path: + continue + conn = sqlite3.connect(path) + try: + exists = conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", + (table_name,) + ).fetchone() + if not exists: + continue + max_ct = conn.execute(f"SELECT MAX(create_time) FROM [{table_name}]").fetchone()[0] or 0 + matches.append({'db_path': path, 'table_name': table_name, 'max_create_time': max_ct}) + except Exception: + pass + finally: + conn.close() + matches.sort(key=lambda x: x['max_create_time'], reverse=True) + return matches + + +# ---- 消息类型 ---- + +def _split_msg_type(t): + try: + t = int(t) + except (TypeError, ValueError): + return 0, 0 + if t > 0xFFFFFFFF: + return t & 0xFFFFFFFF, t >> 32 + return t, 0 + + +def format_msg_type(t): + base_type, _ = _split_msg_type(t) + return { + 1: '文本', 3: '图片', 34: '语音', 42: '名片', + 43: '视频', 47: '表情', 48: '位置', 49: '链接/文件', + 50: '通话', 10000: '系统', 10002: '撤回', + }.get(base_type, f'type={t}') + + +# ---- 内容解压 ---- + +def decompress_content(content, ct): + if ct and ct == 4 and isinstance(content, bytes): + try: + return _zstd_dctx.decompress(content).decode('utf-8', errors='replace') + except Exception: + return None + if isinstance(content, bytes): + try: + return content.decode('utf-8', errors='replace') + except Exception: + return None + return content + + +# ---- 内容解析 ---- + +def _parse_message_content(content, local_type, is_group): + if content is None: + return '', '' + if isinstance(content, bytes): + return '', '(二进制内容)' + sender = '' + text = content + if is_group and ':\n' in content: + sender, text = content.split(':\n', 1) + return sender, text + + +def _collapse_text(text): + if not text: + return '' + return re.sub(r'\s+', ' ', text).strip() + + +def _parse_xml_root(content): + if not content or len(content) > _XML_PARSE_MAX_LEN or _XML_UNSAFE_RE.search(content): + return None + try: + return ET.fromstring(content) + except ET.ParseError: + return None + + +def _parse_int(value, fallback=0): + try: + return int(value) + except (TypeError, ValueError): + return fallback + + +def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names, _display_name_fn): + if not content or ' 160: + ref_content = ref_content[:160] + "..." + quote_text = title or "[引用消息]" + if ref_content: + prefix = f"回复 {ref_display_name}: " if ref_display_name else "回复: " + quote_text += f"\n ↳ {prefix}{ref_content}" + return quote_text + if app_type == 6: + return f"[文件] {title}" if title else "[文件]" + if app_type == 5: + return f"[链接] {title}" if title else "[链接]" + if app_type in (33, 36, 44): + return f"[小程序] {title}" if title else "[小程序]" + if title: + return f"[链接/文件] {title}" + return "[链接/文件]" + + +def _format_voip_message_text(content): + if not content or '= ?') + params.append(start_ts) + if end_ts is not None: + clauses.append('create_time <= ?') + params.append(end_ts) + if keyword: + clauses.append('message_content LIKE ?') + params.append(f'%{keyword}%') + if msg_type_filter is not None: + base_type = msg_type_filter[0] + clauses.append('(local_type & 0xFFFFFFFF) = ?') + params.append(base_type) + if len(msg_type_filter) > 1: + clauses.append('((local_type >> 32) & 0xFFFFFFFF) = ?') + params.append(msg_type_filter[1]) + return clauses, params + + +def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0, msg_type_filter=None): + if not _is_safe_msg_table_name(table_name): + raise ValueError(f'非法消息表名: {table_name}') + clauses, params = _build_message_filters(start_ts, end_ts, keyword, msg_type_filter) + where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else '' + sql = f""" + SELECT local_id, local_type, create_time, real_sender_id, message_content, + WCDB_CT_message_content + FROM [{table_name}] + {where_sql} + ORDER BY create_time DESC + """ + if limit is None: + return conn.execute(sql, params).fetchall() + sql += "\n LIMIT ? OFFSET ?" + return conn.execute(sql, (*params, limit, offset)).fetchall() + + +# ---- 时间解析 ---- + +def parse_time_value(value, field_name, is_end=False): + value = (value or '').strip() + if not value: + return None + formats = [ + ('%Y-%m-%d %H:%M:%S', False), + ('%Y-%m-%d %H:%M', False), + ('%Y-%m-%d', True), + ] + for fmt, date_only in formats: + try: + dt = datetime.strptime(value, fmt) + if date_only and is_end: + dt = dt.replace(hour=23, minute=59, second=59) + return int(dt.timestamp()) + except ValueError: + continue + raise ValueError(f"{field_name} 格式无效: {value}。支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS") + + +def parse_time_range(start_time='', end_time=''): + start_ts = parse_time_value(start_time, 'start_time', is_end=False) + end_ts = parse_time_value(end_time, 'end_time', is_end=True) + if start_ts is not None and end_ts is not None and start_ts > end_ts: + raise ValueError('start_time 不能晚于 end_time') + return start_ts, end_ts + + +def validate_pagination(limit, offset=0, limit_max=_QUERY_LIMIT_MAX): + if limit <= 0: + raise ValueError("limit 必须大于 0") + if limit_max is not None and limit > limit_max: + raise ValueError(f"limit 不能大于 {limit_max}") + if offset < 0: + raise ValueError("offset 不能小于 0") + + +# ---- 聊天上下文 ---- + +def resolve_chat_context(chat_name, msg_db_keys, cache, decrypted_dir): + from .contacts import resolve_username, get_contact_names + username = resolve_username(chat_name, cache, decrypted_dir) + if not username: + return None + names = get_contact_names(cache, decrypted_dir) + display_name = names.get(username, username) + message_tables = _find_msg_tables_for_user(username, msg_db_keys, cache) + if not message_tables: + return { + 'query': chat_name, 'username': username, 'display_name': display_name, + 'db_path': None, 'table_name': None, 'message_tables': [], + 'is_group': '@chatroom' in username, + } + primary = message_tables[0] + return { + 'query': chat_name, 'username': username, 'display_name': display_name, + 'db_path': primary['db_path'], 'table_name': primary['table_name'], + 'message_tables': message_tables, 'is_group': '@chatroom' in username, + } + + +def _iter_table_contexts(ctx): + tables = ctx.get('message_tables') or [] + if not tables and ctx.get('db_path') and ctx.get('table_name'): + tables = [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}] + for table in tables: + yield { + 'query': ctx['query'], 'username': ctx['username'], 'display_name': ctx['display_name'], + 'db_path': table['db_path'], 'table_name': table['table_name'], + 'is_group': ctx['is_group'], + } + + +def _candidate_page_size(limit, offset): + return limit + offset + + +def _page_ranked_entries(entries, limit, offset): + ordered = sorted(entries, key=lambda item: item[0], reverse=True) + paged = ordered[offset:offset + limit] + paged.sort(key=lambda item: item[0]) + return paged + + +# ---- 构建行 ---- + +def _build_history_line(row, ctx, names, id_to_username, display_name_fn): + local_id, local_type, create_time, real_sender_id, content, ct = row + time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M') + content = decompress_content(content, ct) + if content is None: + content = '(无法解压)' + sender, text = _format_message_text( + local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn + ) + sender_label = _resolve_sender_label( + real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn + ) + if sender_label: + return create_time, f'[{time_str}] {sender_label}: {text}' + return create_time, f'[{time_str}] {text}' + + +def _build_search_entry(row, ctx, names, id_to_username, display_name_fn): + local_id, local_type, create_time, real_sender_id, content, ct = row + content = decompress_content(content, ct) + if content is None: + return None + sender, text = _format_message_text( + local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn + ) + if text and len(text) > 300: + text = text[:300] + '...' + sender_label = _resolve_sender_label( + real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn + ) + time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M') + entry = f"[{time_str}] [{ctx['display_name']}]" + if sender_label: + entry += f" {sender_label}:" + entry += f" {text}" + return create_time, entry + + +# ---- 聊天记录查询 ---- + +def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None): + collected = [] + failures = [] + candidate_limit = _candidate_page_size(limit, offset) + batch_size = min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) + + for table_ctx in _iter_table_contexts(ctx): + try: + with closing(sqlite3.connect(table_ctx['db_path'])) as conn: + id_to_username = _load_name2id_maps(conn) + fetch_offset = 0 + before = len(collected) + while len(collected) - before < candidate_limit: + rows = _query_messages(conn, table_ctx['table_name'], start_ts=start_ts, end_ts=end_ts, limit=batch_size, offset=fetch_offset, msg_type_filter=msg_type_filter) + if not rows: + break + fetch_offset += len(rows) + for row in rows: + try: + collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn)) + except Exception as e: + failures.append(f"local_id={row[0]}: {e}") + if len(collected) - before >= candidate_limit: + break + if len(rows) < batch_size: + break + except Exception as e: + failures.append(f"{table_ctx['db_path']}: {e}") + + paged = _page_ranked_entries(collected, limit, offset) + return [line for _, line in paged], failures + + +# ---- 搜索查询 ---- + +def _collect_search_entries(conn, contexts, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None): + collected = [] + failures = [] + id_to_username = _load_name2id_maps(conn) + batch_size = candidate_limit + + for ctx in contexts: + try: + fetch_offset = 0 + before = len(collected) + while len(collected) - before < candidate_limit: + rows = _query_messages(conn, ctx['table_name'], start_ts=start_ts, end_ts=end_ts, keyword=keyword, limit=batch_size, offset=fetch_offset, msg_type_filter=msg_type_filter) + if not rows: + break + fetch_offset += len(rows) + for row in rows: + formatted = _build_search_entry(row, ctx, names, id_to_username, display_name_fn) + if formatted: + collected.append(formatted) + if len(collected) - before >= candidate_limit: + break + if len(rows) < batch_size: + break + except Exception as e: + failures.append(f"{ctx['display_name']}: {e}") + return collected, failures + + +def collect_chat_search(ctx, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None): + collected = [] + failures = [] + contexts_by_db = {} + for table_ctx in _iter_table_contexts(ctx): + contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx) + + for db_path, db_contexts in contexts_by_db.items(): + try: + with closing(sqlite3.connect(db_path)) as conn: + db_entries, db_failures = _collect_search_entries( + conn, db_contexts, names, keyword, display_name_fn, + start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit, + msg_type_filter=msg_type_filter, + ) + collected.extend(db_entries) + failures.extend(db_failures) + except Exception as e: + failures.extend(f"{tc['display_name']}: {e}" for tc in db_contexts) + return collected, failures + + +def search_all_messages(msg_db_keys, cache, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None): + collected = [] + failures = [] + for rel_key in msg_db_keys: + path = cache.get(rel_key) + if not path: + continue + try: + with closing(sqlite3.connect(path)) as conn: + contexts = _load_search_contexts_from_db(conn, path, names) + db_entries, db_failures = _collect_search_entries( + conn, contexts, names, keyword, display_name_fn, + start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit, + msg_type_filter=msg_type_filter, + ) + collected.extend(db_entries) + failures.extend(db_failures) + except Exception as e: + failures.append(f"{rel_key}: {e}") + return collected, failures + + +def _load_search_contexts_from_db(conn, db_path, names): + tables = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'" + ).fetchall() + table_to_username = {} + try: + for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall(): + if not user_name: + continue + table_hash = hashlib.md5(user_name.encode()).hexdigest() + table_to_username[f"Msg_{table_hash}"] = user_name + except sqlite3.Error: + pass + contexts = [] + for (table_name,) in tables: + username = table_to_username.get(table_name, '') + display_name = names.get(username, username) if username else table_name + contexts.append({ + 'query': display_name, 'username': username, 'display_name': display_name, + 'db_path': db_path, 'table_name': table_name, 'is_group': '@chatroom' in username, + }) + return contexts + + +# ---- 多聊天上下文解析 ---- + +def resolve_chat_contexts(chat_names, msg_db_keys, cache, decrypted_dir): + resolved = [] + unresolved = [] + missing_tables = [] + seen = set() + for chat_name in chat_names: + name = (chat_name or '').strip() + if not name: + unresolved.append('(空)') + continue + ctx = resolve_chat_context(name, msg_db_keys, cache, decrypted_dir) + if not ctx: + unresolved.append(name) + continue + if not ctx['message_tables']: + missing_tables.append(ctx['display_name']) + continue + if ctx['username'] in seen: + continue + seen.add(ctx['username']) + resolved.append(ctx) + return resolved, unresolved, missing_tables + + +# ---- 聊天统计 ---- + +def collect_chat_stats(ctx, names, display_name_fn, start_ts=None, end_ts=None): + """聚合统计指定聊天的消息数据。 + + 返回: { + total, type_breakdown: {type_name: count}, + top_senders: [{name, count}], + hourly: {0:N, ..., 23:N} + } + """ + type_map = { + 1: '文本', 3: '图片', 34: '语音', 42: '名片', + 43: '视频', 47: '表情', 48: '位置', 49: '链接/文件', + 50: '通话', 10000: '系统', 10002: '撤回', + } + + total = 0 + type_counts = {} + sender_counts = {} + hourly_counts = {} + + for table_ctx in _iter_table_contexts(ctx): + try: + with closing(sqlite3.connect(table_ctx['db_path'])) as conn: + id_to_username = _load_name2id_maps(conn) + tbl = table_ctx['table_name'] + if not _is_safe_msg_table_name(tbl): + continue + + where_parts = [] + params = [] + if start_ts is not None: + where_parts.append('create_time >= ?') + params.append(start_ts) + if end_ts is not None: + where_parts.append('create_time <= ?') + params.append(end_ts) + where_sql = f"WHERE {' AND '.join(where_parts)}" if where_parts else '' + + # 总数 + 类型分布 + for bt, cnt in conn.execute( + f"SELECT (local_type & 0xFFFFFFFF), COUNT(*) FROM [{tbl}] {where_sql} GROUP BY (local_type & 0xFFFFFFFF)", + params + ).fetchall(): + label = type_map.get(bt, f'type={bt}') + type_counts[label] = type_counts.get(label, 0) + cnt + total += cnt + + # 发送者排名 + for sid, cnt in conn.execute( + f"SELECT real_sender_id, COUNT(*) FROM [{tbl}] {where_sql} GROUP BY real_sender_id ORDER BY COUNT(*) DESC LIMIT 20", + params + ).fetchall(): + uname = id_to_username.get(sid, str(sid)) + if uname: + sender_counts[uname] = sender_counts.get(uname, 0) + cnt + + # 24小时分布 + for h, cnt in conn.execute( + f"SELECT cast(strftime('%H', create_time, 'unixepoch', 'localtime') as integer), COUNT(*) FROM [{tbl}] {where_sql} GROUP BY cast(strftime('%H', create_time, 'unixepoch', 'localtime') as integer)", + params + ).fetchall(): + if h is not None: + hourly_counts[h] = hourly_counts.get(h, 0) + cnt + except Exception: + pass + + top_senders = sorted(sender_counts.items(), key=lambda x: x[1], reverse=True)[:10] + top_senders = [{'name': display_name_fn(u, names), 'count': c} for u, c in top_senders] + + hourly = {h: hourly_counts.get(h, 0) for h in range(24)} + + return { + 'total': total, + 'type_breakdown': dict(sorted(type_counts.items(), key=lambda x: x[1], reverse=True)), + 'top_senders': top_senders, + 'hourly': hourly, + } diff --git a/wechat_cli/keys/__init__.py b/wechat_cli/keys/__init__.py new file mode 100644 index 0000000..602079d --- /dev/null +++ b/wechat_cli/keys/__init__.py @@ -0,0 +1,31 @@ +"""密钥提取模块 — 根据平台调用对应的 scanner""" + +import platform + + +def extract_keys(db_dir, output_path, pid=None): + """提取微信数据库密钥并保存到 output_path。 + + Args: + db_dir: 微信数据库目录(db_storage) + output_path: all_keys.json 输出路径 + pid: 可选,指定微信进程 PID(默认自动检测) + + Returns: + dict: salt_hex -> enc_key_hex 的映射 + + Raises: + RuntimeError: 提取失败 + """ + system = platform.system().lower() + if system == "darwin": + from .scanner_macos import extract_keys as _extract + return _extract(db_dir, output_path, pid=pid) + elif system == "windows": + from .scanner_windows import extract_keys as _extract + return _extract(db_dir, output_path, pid=pid) + elif system == "linux": + from .scanner_linux import extract_keys as _extract + return _extract(db_dir, output_path, pid=pid) + else: + raise RuntimeError(f"不支持的平台: {platform.system()}") diff --git a/wechat_cli/keys/common.py b/wechat_cli/keys/common.py new file mode 100644 index 0000000..c281b67 --- /dev/null +++ b/wechat_cli/keys/common.py @@ -0,0 +1,184 @@ +""" +跨平台共享的内存扫描逻辑:HMAC 验证、DB 收集、hex 模式匹配与结果输出。 + +从 wechat-decrypt/key_scan_common.py 适配,save_results 改为返回 dict。 +""" + +import hashlib +import hmac as hmac_mod +import json +import os +import re +import struct + +PAGE_SZ = 4096 +KEY_SZ = 32 +SALT_SZ = 16 + + +def verify_enc_key(enc_key, db_page1): + """通过 HMAC-SHA512 校验 page 1 验证 enc_key 是否正确。""" + salt = db_page1[:SALT_SZ] + mac_salt = bytes(b ^ 0x3A for b in salt) + mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ) + hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16] + stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ] + hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512) + hm.update(struct.pack(" 96 and hex_len % 2 == 0: + enc_key_hex = hex_str[:64] + salt_hex = hex_str[-32:] + if salt_hex in remaining_salts: + enc_key = bytes.fromhex(enc_key_hex) + for rel, path, sz, s, page1 in db_files: + if s == salt_hex and verify_enc_key(enc_key, page1): + key_map[salt_hex] = enc_key_hex + remaining_salts.discard(salt_hex) + dbs = salt_to_dbs[salt_hex] + print_fn(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})") + print_fn(f" enc_key={enc_key_hex}") + print_fn(f" PID={pid} 地址: 0x{addr:016X}") + print_fn(f" 数据库: {', '.join(dbs)}") + break + + return matches + + +def cross_verify_keys(db_files, salt_to_dbs, key_map, print_fn): + """用已找到的 key 交叉验证未匹配的 salt。""" + missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys()) + if not missing_salts or not key_map: + return + print_fn(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...") + for salt_hex in list(missing_salts): + for rel, path, sz, s, page1 in db_files: + if s == salt_hex: + for known_salt, known_key_hex in key_map.items(): + enc_key = bytes.fromhex(known_key_hex) + if verify_enc_key(enc_key, page1): + key_map[salt_hex] = known_key_hex + print_fn(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}") + missing_salts.discard(salt_hex) + break + + +def save_results(db_files, salt_to_dbs, key_map, output_path, print_fn): + """保存密钥结果到 JSON 文件。 + + Args: + db_files: collect_db_files 返回的 db_files + salt_to_dbs: collect_db_files 返回的 salt_to_dbs + key_map: {salt_hex: enc_key_hex} + output_path: 输出 JSON 文件路径 + print_fn: 日志输出函数 + + Returns: + dict: salt_hex -> enc_key_hex 映射 + + Raises: + RuntimeError: 未提取到任何密钥 + """ + print_fn(f"\n{'=' * 60}") + print_fn(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥") + + result = {} + for rel, path, sz, salt_hex, page1 in db_files: + if salt_hex in key_map: + result[rel] = { + "enc_key": key_map[salt_hex], + "salt": salt_hex, + "size_mb": round(sz / 1024 / 1024, 1) + } + print_fn(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)") + else: + print_fn(f" MISSING: {rel} (salt={salt_hex})") + + if not result: + print_fn(f"\n[!] 未提取到任何密钥") + raise RuntimeError("未能从任何微信进程中提取到密钥") + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, ensure_ascii=False) + print_fn(f"\n密钥保存到: {output_path}") + + missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map] + if missing: + print_fn(f"\n未找到密钥的数据库:") + for rel in missing: + print_fn(f" {rel}") + + return key_map diff --git a/wechat_cli/keys/scanner_linux.py b/wechat_cli/keys/scanner_linux.py new file mode 100644 index 0000000..b31f8c5 --- /dev/null +++ b/wechat_cli/keys/scanner_linux.py @@ -0,0 +1,218 @@ +"""Linux 密钥提取 — 通过 /proc 读取微信进程内存""" + +import functools +import os +import re +import sys +import time + +from .common import collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results + +print = functools.partial(print, flush=True) + + +def _safe_readlink(path): + try: + return os.path.realpath(os.readlink(path)) + except OSError: + return "" + + +_KNOWN_COMMS = {"wechat", "wechatappex", "weixin"} +_INTERPRETER_PREFIXES = ("python", "bash", "sh", "zsh", "node", "perl", "ruby") + + +def _is_wechat_process(pid): + """检查 pid 是否为微信进程。""" + if pid == os.getpid(): + return False + try: + with open(f"/proc/{pid}/comm") as f: + comm = f.read().strip() + if comm.lower() in _KNOWN_COMMS: + return True + exe_path = _safe_readlink(f"/proc/{pid}/exe") + exe_name = os.path.basename(exe_path) + if any(exe_name.lower().startswith(p) for p in _INTERPRETER_PREFIXES): + return False + return "wechat" in exe_name.lower() or "weixin" in exe_name.lower() + except (PermissionError, FileNotFoundError, ProcessLookupError): + return False + + +def _get_pids(): + """返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。""" + pids = [] + for pid_str in os.listdir("/proc"): + if not pid_str.isdigit(): + continue + pid = int(pid_str) + try: + if not _is_wechat_process(pid): + continue + with open(f"/proc/{pid}/statm") as f: + rss_pages = int(f.read().split()[1]) + rss_kb = rss_pages * 4 + pids.append((pid, rss_kb)) + except (PermissionError, FileNotFoundError, ProcessLookupError): + continue + + if not pids: + raise RuntimeError("未检测到 Linux 微信进程") + + pids.sort(key=lambda item: item[1], reverse=True) + for pid, rss_kb in pids: + exe_path = _safe_readlink(f"/proc/{pid}/exe") + print(f"[+] WeChat PID={pid} ({rss_kb // 1024}MB) {exe_path}") + return pids + + +_SKIP_MAPPINGS = {"[vdso]", "[vsyscall]", "[vvar]"} +_SKIP_PATH_PREFIXES = ("/usr/lib/", "/lib/", "/usr/share/") + + +def _get_readable_regions(pid): + """解析 /proc//maps,返回可读内存区域列表。""" + regions = [] + with open(f"/proc/{pid}/maps") as f: + for line in f: + parts = line.split() + if len(parts) < 2: + continue + if "r" not in parts[1]: + continue + if len(parts) >= 6: + mapping_name = parts[5] + if mapping_name in _SKIP_MAPPINGS: + continue + mapping_lower = mapping_name.lower() + if (any(mapping_name.startswith(p) for p in _SKIP_PATH_PREFIXES) + and "wcdb" not in mapping_lower + and "wechat" not in mapping_lower + and "weixin" not in mapping_lower): + continue + start_s, end_s = parts[0].split("-") + start = int(start_s, 16) + size = int(end_s, 16) - start + if 0 < size < 500 * 1024 * 1024: + regions.append((start, size)) + return regions + + +def _check_permissions(): + """检查是否有读取进程内存的权限。""" + if os.geteuid() == 0: + return + try: + with open("/proc/self/status") as f: + for line in f: + if line.startswith("CapEff:"): + cap_eff = int(line.split(":")[1].strip(), 16) + CAP_SYS_PTRACE = 1 << 19 + if cap_eff & CAP_SYS_PTRACE: + return + break + except (OSError, ValueError): + pass + raise RuntimeError( + "需要 root 权限或 CAP_SYS_PTRACE 才能读取进程内存\n" + "请使用: sudo wechat-cli init\n" + "或授予 capability: sudo setcap cap_sys_ptrace=ep $(which python3)" + ) + + +def extract_keys(db_dir, output_path, pid=None): + """提取 Linux 微信数据库密钥。 + + Args: + db_dir: 微信数据库目录 + output_path: all_keys.json 输出路径 + pid: 可选,指定 PID(默认自动检测) + + Returns: + dict: salt_hex -> enc_key_hex 映射 + """ + _check_permissions() + + print("=" * 60) + print(" 提取 Linux 微信数据库密钥(内存扫描)") + print("=" * 60) + + db_files, salt_to_dbs = collect_db_files(db_dir) + if not db_files: + raise RuntimeError(f"在 {db_dir} 未找到可解密的 .db 文件") + + print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的 salt") + for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True): + print(f" salt {salt_hex}: {', '.join(dbs)}") + + pids = _get_pids() if pid is None else [(pid, 0)] + + hex_re = re.compile(rb"x'([0-9a-fA-F]{64,192})'") + key_map = {} + remaining_salts = set(salt_to_dbs.keys()) + all_hex_matches = 0 + t0 = time.time() + + for pid_val, rss_kb in pids: + try: + regions = _get_readable_regions(pid_val) + except PermissionError: + print(f"[WARN] 无法读取 /proc/{pid_val}/maps,权限不足,跳过") + continue + except (FileNotFoundError, ProcessLookupError): + print(f"[WARN] PID {pid_val} 已退出,跳过") + continue + + total_bytes = sum(s for _, s in regions) + total_mb = total_bytes / 1024 / 1024 + print(f"\n[*] 扫描 PID={pid_val} ({total_mb:.0f}MB, {len(regions)} 区域)") + + scanned_bytes = 0 + try: + mem = open(f"/proc/{pid_val}/mem", "rb") + except PermissionError: + print(f"[WARN] 无法打开 /proc/{pid_val}/mem,权限不足,跳过") + continue + except (FileNotFoundError, ProcessLookupError): + print(f"[WARN] PID {pid_val} 已退出,跳过") + continue + + if not _is_wechat_process(pid_val): + print(f"[WARN] PID {pid_val} 已不是微信进程,跳过") + mem.close() + continue + + try: + for reg_idx, (base, size) in enumerate(regions): + try: + mem.seek(base) + data = mem.read(size) + except (OSError, ValueError): + continue + scanned_bytes += len(data) + + all_hex_matches += scan_memory_for_keys( + data, hex_re, db_files, salt_to_dbs, + key_map, remaining_salts, base, pid_val, print, + ) + + if (reg_idx + 1) % 200 == 0: + elapsed = time.time() - t0 + progress = scanned_bytes / total_bytes * 100 if total_bytes else 100 + print( + f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " + f"{all_hex_matches} hex patterns, {elapsed:.1f}s" + ) + finally: + mem.close() + + if not remaining_salts: + print(f"\n[+] 所有密钥已找到,跳过剩余进程") + break + + elapsed = time.time() - t0 + print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式") + + cross_verify_keys(db_files, salt_to_dbs, key_map, print) + return save_results(db_files, salt_to_dbs, key_map, output_path, print) diff --git a/wechat_cli/keys/scanner_macos.py b/wechat_cli/keys/scanner_macos.py new file mode 100644 index 0000000..a9818ff --- /dev/null +++ b/wechat_cli/keys/scanner_macos.py @@ -0,0 +1,114 @@ +"""macOS 密钥提取 — 通过 C 二进制扫描微信进程内存""" + +import os +import platform +import subprocess +import sys + +from .common import collect_db_files, cross_verify_keys, save_results, scan_memory_for_keys + + +def _find_binary(): + """查找对应架构的 C 二进制。""" + machine = platform.machine() + if machine == "arm64": + name = "find_all_keys_macos.arm64" + elif machine == "x86_64": + name = "find_all_keys_macos.x86_64" + else: + raise RuntimeError(f"不支持的 macOS 架构: {machine}") + + # 优先查找 bin/ 目录(pip 安装后位于包内) + pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + bin_path = os.path.join(pkg_dir, "bin", name) + if os.path.isfile(bin_path): + return bin_path + + raise RuntimeError( + f"找不到密钥提取二进制: {bin_path}\n" + "请确认安装包完整" + ) + + +def extract_keys(db_dir, output_path, pid=None): + """通过 C 二进制提取 macOS 微信数据库密钥。 + + C 二进制需要在微信数据目录的父目录下运行, + 因为它会自动检测 db_storage 子目录。 + 输出 all_keys.json 到当前工作目录。 + + Args: + db_dir: 微信 db_storage 目录 + output_path: all_keys.json 输出路径 + pid: 未使用(C 二进制自动检测进程) + + Returns: + dict: salt_hex -> enc_key_hex 映射 + """ + import re + import json + + binary = _find_binary() + + # C 二进制的工作目录需要是 db_storage 的父目录 + work_dir = os.path.dirname(db_dir) + if not os.path.isdir(work_dir): + raise RuntimeError(f"微信数据目录不存在: {work_dir}") + + print(f"[+] 使用 C 二进制提取密钥: {binary}") + print(f"[+] 工作目录: {work_dir}") + + try: + result = subprocess.run( + [binary], + cwd=work_dir, + capture_output=True, + text=True, + timeout=120, + ) + except subprocess.TimeoutExpired: + raise RuntimeError("密钥提取超时(120s)") + except PermissionError: + raise RuntimeError( + f"无法执行 {binary}\n" + "请确保文件有执行权限: chmod +x " + binary + ) + + # 打印 C 二进制的输出 + if result.stdout: + print(result.stdout) + if result.stderr: + print(result.stderr, file=sys.stderr) + + # C 二进制输出 all_keys.json 到 work_dir + c_output = os.path.join(work_dir, "all_keys.json") + if not os.path.exists(c_output): + if "task_for_pid" in (result.stdout or "") + (result.stderr or ""): + raise RuntimeError( + "需要 root 权限才能读取微信进程内存。\n" + "请使用: sudo wechat-cli init" + ) + raise RuntimeError( + "C 二进制未能生成密钥文件。\n" + f"stdout: {result.stdout}\nstderr: {result.stderr}" + ) + + # 读取并转存到 output_path + with open(c_output, encoding="utf-8") as f: + keys_data = json.load(f) + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(keys_data, f, indent=2, ensure_ascii=False) + + # 清理 C 二进制的临时输出 + if os.path.abspath(c_output) != os.path.abspath(output_path): + os.remove(c_output) + + # 构建 salt -> key 映射 + key_map = {} + for rel, info in keys_data.items(): + if isinstance(info, dict) and "enc_key" in info and "salt" in info: + key_map[info["salt"]] = info["enc_key"] + + print(f"\n[+] 提取到 {len(key_map)} 个密钥,保存到: {output_path}") + return key_map diff --git a/wechat_cli/keys/scanner_windows.py b/wechat_cli/keys/scanner_windows.py new file mode 100644 index 0000000..04fbf3c --- /dev/null +++ b/wechat_cli/keys/scanner_windows.py @@ -0,0 +1,145 @@ +"""Windows 密钥提取 — 扫描 Weixin.exe 进程内存""" + +import ctypes +import ctypes.wintypes as wt +import functools +import os +import re +import subprocess +import time + +from .common import collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results + +print = functools.partial(print, flush=True) + +kernel32 = ctypes.windll.kernel32 +MEM_COMMIT = 0x1000 +READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80} + + +class MBI(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64), + ("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD), + ("RegionSize", ctypes.c_uint64), ("State", wt.DWORD), + ("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD), + ] + + +def _get_pids(): + """返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序""" + r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"], + capture_output=True, text=True) + pids = [] + for line in r.stdout.strip().split('\n'): + if not line.strip(): + continue + p = line.strip('"').split('","') + if len(p) >= 5: + pid = int(p[1]) + mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0') + pids.append((pid, mem)) + if not pids: + raise RuntimeError("Weixin.exe 未运行") + pids.sort(key=lambda x: x[1], reverse=True) + for pid, mem in pids: + print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)") + return pids + + +def _read_mem(h, addr, sz): + buf = ctypes.create_string_buffer(sz) + n = ctypes.c_size_t(0) + if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)): + return buf.raw[:n.value] + return None + + +def _enum_regions(h): + regs = [] + addr = 0 + mbi = MBI() + while addr < 0x7FFFFFFFFFFF: + if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0: + break + if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024: + regs.append((mbi.BaseAddress, mbi.RegionSize)) + nxt = mbi.BaseAddress + mbi.RegionSize + if nxt <= addr: + break + addr = nxt + return regs + + +def extract_keys(db_dir, output_path, pid=None): + """提取 Windows 微信数据库密钥。 + + Args: + db_dir: 微信数据库目录 + output_path: all_keys.json 输出路径 + pid: 可选,指定 PID(默认自动检测所有 Weixin.exe) + + Returns: + dict: salt_hex -> enc_key_hex 映射 + """ + print("=" * 60) + print(" 提取所有微信数据库密钥") + print("=" * 60) + + db_files, salt_to_dbs = collect_db_files(db_dir) + + print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt") + for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True): + print(f" salt {salt_hex}: {', '.join(dbs)}") + + pids = _get_pids() if pid is None else [(pid, 0)] + + hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'") + key_map = {} + remaining_salts = set(salt_to_dbs.keys()) + all_hex_matches = 0 + t0 = time.time() + + for pid_val, mem_kb in pids: + h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid_val) + if not h: + print(f"[WARN] 无法打开进程 PID={pid_val},跳过") + continue + + try: + regions = _enum_regions(h) + total_bytes = sum(s for _, s in regions) + total_mb = total_bytes / 1024 / 1024 + print(f"\n[*] 扫描 PID={pid_val} ({total_mb:.0f}MB, {len(regions)} 区域)") + + scanned_bytes = 0 + for reg_idx, (base, size) in enumerate(regions): + data = _read_mem(h, base, size) + scanned_bytes += size + if not data: + continue + + all_hex_matches += scan_memory_for_keys( + data, hex_re, db_files, salt_to_dbs, + key_map, remaining_salts, base, pid_val, print, + ) + + if (reg_idx + 1) % 200 == 0: + elapsed = time.time() - t0 + progress = scanned_bytes / total_bytes * 100 if total_bytes else 100 + print( + f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, " + f"{all_hex_matches} hex patterns, {elapsed:.1f}s" + ) + finally: + kernel32.CloseHandle(h) + + if not remaining_salts: + print(f"\n[+] 所有密钥已找到,跳过剩余进程") + break + + elapsed = time.time() - t0 + print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式") + + cross_verify_keys(db_files, salt_to_dbs, key_map, print) + return save_results(db_files, salt_to_dbs, key_map, output_path, print) diff --git a/wechat_cli/main.py b/wechat_cli/main.py new file mode 100644 index 0000000..45a2226 --- /dev/null +++ b/wechat_cli/main.py @@ -0,0 +1,70 @@ +"""wechat-cli 入口""" + +import sys + +import click + +from .core.context import AppContext + + +@click.group() +@click.option("--config", "config_path", default=None, envvar="WECHAT_CLI_CONFIG", + help="config.json 路径(默认自动查找)") +@click.pass_context +def cli(ctx, config_path): + """WeChat CLI — 查询微信消息、联系人等数据 + + \b + 使用示例: + wechat-cli init # 首次使用:提取密钥 + wechat-cli sessions # 最近会话列表 + wechat-cli sessions --limit 10 # 最近 10 个会话 + wechat-cli history "张三" --limit 20 # 查看张三的最近 20 条消息 + wechat-cli history "AI交流群" --start-time "2026-04-01" # 指定时间范围 + wechat-cli search "Claude" --chat "AI交流群" # 在指定群里搜索关键词 + wechat-cli search "你好" --limit 50 # 全局搜索 + wechat-cli contacts --query "李" # 搜索联系人 + wechat-cli new-messages # 获取增量新消息 + """ + # init 命令不需要 AppContext + if ctx.invoked_subcommand == "init": + return + + try: + ctx.obj = AppContext(config_path) + except FileNotFoundError as e: + click.echo(str(e), err=True) + sys.exit(1) + except Exception as e: + click.echo(f"初始化失败: {e}", err=True) + sys.exit(1) + + +# 注册子命令 +from .commands.init import init +from .commands.sessions import sessions +from .commands.history import history +from .commands.search import search +from .commands.contacts import contacts +from .commands.new_messages import new_messages +from .commands.members import members +from .commands.export import export +from .commands.stats import stats +from .commands.unread import unread +from .commands.favorites import favorites + +cli.add_command(init) +cli.add_command(sessions) +cli.add_command(history) +cli.add_command(search) +cli.add_command(contacts) +cli.add_command(new_messages) +cli.add_command(members) +cli.add_command(export) +cli.add_command(stats) +cli.add_command(unread) +cli.add_command(favorites) + + +if __name__ == "__main__": + cli() diff --git a/wechat_cli/output/__init__.py b/wechat_cli/output/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/wechat_cli/output/formatter.py b/wechat_cli/output/formatter.py new file mode 100644 index 0000000..0b1891b --- /dev/null +++ b/wechat_cli/output/formatter.py @@ -0,0 +1,29 @@ +"""输出格式化 — JSON (大模型友好) / Text (人类可读)""" + +import json +import sys + + +def output_json(data, file=None): + file = file or sys.stdout + json.dump(data, file, ensure_ascii=False, indent=2) + file.write('\n') + + +def output_text(text, file=None): + file = file or sys.stdout + file.write(text) + if not text.endswith('\n'): + file.write('\n') + + +def output(data, fmt='json', file=None): + if fmt == 'json': + output_json(data, file) + else: + if isinstance(data, str): + output_text(data, file) + elif isinstance(data, dict) and 'text' in data: + output_text(data['text'], file) + else: + output_json(data, file)