Initial release: wechat-cli v0.2.0

A CLI tool to query local WeChat data with 11 commands:
sessions, history, search, contacts, members, stats, export,
favorites, unread, new-messages, and init.

Features:
- Self-contained init with key extraction (no external deps)
- On-the-fly SQLCipher decryption with caching
- JSON output by default for LLM/AI tool integration
- Message type filtering and chat statistics
- Markdown/txt export for conversations
- Cross-platform: macOS, Windows, Linux
This commit is contained in:
canghe
2026-04-04 11:10:10 +08:00
commit e64006bafe
36 changed files with 4238 additions and 0 deletions

37
.gitignore vendored Normal file
View File

@@ -0,0 +1,37 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
*.egg
dist/
build/
# Virtual environments
.venv/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Project-specific
.claude/
*.db
*.db-wal
*.db-shm
# Sensitive data — NEVER commit
*.json
!pyproject.toml
# Temp files
*.tmp
*.bak

202
LICENSE Normal file
View File

@@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

234
README.md Normal file
View File

@@ -0,0 +1,234 @@
# WeChat CLI
[中文文档](README_CN.md)
A command-line tool to query your local WeChat data — chat history, contacts, sessions, favorites, and more. Designed for LLM integration with JSON output by default.
## Features
- **Self-contained** — `pip install` + `wechat-cli init`, no external dependencies
- **11 commands** — sessions, history, search, contacts, members, stats, export, favorites, unread, new-messages, init
- **JSON by default** — structured output for programmatic access
- **Cross-platform** — macOS, Windows, Linux
- **On-the-fly decryption** — SQLCipher databases decrypted transparently with caching
- **Message type filtering** — filter by text, image, link, file, video, etc.
- **Chat statistics** — top senders, type breakdown, hourly activity distribution
- **Markdown export** — export conversations as markdown or plain text
## Quick Start
### Install
```bash
pip install wechat-cli
```
Or install from source:
```bash
git clone https://github.com/canghe/wechat-cli.git
cd wechat-cli
pip install -e .
```
### Initialize
Make sure WeChat is running, then:
```bash
# macOS/Linux: may need sudo for memory scanning
sudo wechat-cli init
# Windows: run in a terminal with sufficient privileges
wechat-cli init
```
This will:
1. Auto-detect your WeChat data directory
2. Extract encryption keys from WeChat process memory
3. Save config and keys to `~/.wechat-cli/`
That's it — you're ready to go.
## Commands
### sessions — Recent Chats
```bash
wechat-cli sessions # Last 20 sessions (JSON)
wechat-cli sessions --limit 10 # Last 10
wechat-cli sessions --format text # Human-readable
```
### history — Chat Messages
```bash
wechat-cli history "Alice" # Last 50 messages
wechat-cli history "Alice" --limit 100 --offset 50
wechat-cli history "Team" --start-time "2026-04-01" --end-time "2026-04-03"
wechat-cli history "Alice" --type link # Only links/files
wechat-cli history "Alice" --format text
```
**Options:** `--limit`, `--offset`, `--start-time`, `--end-time`, `--type`, `--format`
### search — Search Messages
```bash
wechat-cli search "hello" # Global search
wechat-cli search "hello" --chat "Alice" # In specific chat
wechat-cli search "meeting" --chat "TeamA" --chat "TeamB" # Multiple chats
wechat-cli search "report" --type file # Only files
wechat-cli search "hello" --start-time "2026-04-01" --limit 50
```
**Options:** `--chat` (repeatable), `--start-time`, `--end-time`, `--limit`, `--offset`, `--type`, `--format`
### contacts — Contact Search & Details
```bash
wechat-cli contacts --query "Li" # Search contacts
wechat-cli contacts --detail "Alice" # View contact details
wechat-cli contacts --detail "wxid_xxx" # By WeChat ID
```
Details include: nickname, remark, WeChat ID (alias), bio, avatar URL, account type.
### members — Group Members
```bash
wechat-cli members "Team Group" # List all members (JSON)
wechat-cli members "Team Group" --format text
```
Shows member list with display names and group owner.
### stats — Chat Statistics
```bash
wechat-cli stats "Team Group"
wechat-cli stats "Alice" --start-time "2026-04-01" --end-time "2026-04-03"
wechat-cli stats "Team Group" --format text
```
Returns: total messages, type breakdown, top 10 senders, 24-hour activity distribution.
### export — Export Conversations
```bash
wechat-cli export "Alice" --format markdown # To stdout
wechat-cli export "Alice" --format txt --output chat.txt # To file
wechat-cli export "Team" --start-time "2026-04-01" --limit 1000
```
**Options:** `--format markdown|txt`, `--output`, `--start-time`, `--end-time`, `--limit`
### favorites — WeChat Bookmarks
```bash
wechat-cli favorites # Recent bookmarks
wechat-cli favorites --type article # Articles only
wechat-cli favorites --query "machine learning" # Search
```
**Types:** text, image, article, card, video
### unread — Unread Sessions
```bash
wechat-cli unread # All unread sessions
wechat-cli unread --limit 10 --format text
```
### new-messages — Incremental New Messages
```bash
wechat-cli new-messages # First call: return unread + save state
wechat-cli new-messages # Subsequent: only new since last call
```
State persists at `~/.wechat-cli/last_check.json`. Delete it to reset.
## Message Type Filter
The `--type` option (available on `history` and `search`) accepts:
| Value | Description |
|-------|-------------|
| `text` | Text messages |
| `image` | Images |
| `voice` | Voice messages |
| `video` | Videos |
| `sticker` | Stickers/emojis |
| `location` | Location shares |
| `link` | Links and app messages |
| `file` | File attachments |
| `call` | Voice/video calls |
| `system` | System messages |
## Use Cases
### LLM / AI Tool Integration
```bash
# For Claude Code, Cursor, or any AI tool that can run shell commands
wechat-cli sessions --limit 5
wechat-cli history "Alice" --limit 20 --format text
wechat-cli search "deadline" --chat "Team" --type text
```
All commands output JSON by default, making them ideal for AI agent tool calls.
### Chat Analysis
```bash
# Who talks the most in a group?
wechat-cli stats "Team Group" --format text
# Find all shared links in a conversation
wechat-cli history "Alice" --type link --limit 50
# Search for a specific file
wechat-cli search "report.xlsx" --type file
```
### Data Backup
```bash
# Export important conversations
wechat-cli export "Team Group" --format markdown --output team_chat.md
wechat-cli export "Alice" --start-time "2026-01-01" --format txt --output alice_2026.txt
```
### Notification Monitoring
```bash
# Cron job to check for new messages every 5 minutes
*/5 * * * * wechat-cli new-messages --format text
```
## Platform Support
| Platform | Status | Notes |
|----------|--------|-------|
| macOS (Apple Silicon) | Supported | Bundled arm64 binary for key extraction |
| macOS (Intel) | Supported | x86_64 binary needed |
| Windows | Supported | Reads Weixin.exe process memory |
| Linux | Supported | Reads /proc/pid/mem, requires root |
## How It Works
WeChat stores chat data in SQLCipher-encrypted SQLite databases on your local machine. WeChat CLI:
1. **Extracts keys** — scans WeChat process memory to find encryption keys (`wechat-cli init`)
2. **Decrypts on-the-fly** — transparently decrypts databases with page-level AES-256-CBC + caching
3. **Queries locally** — all data stays on your machine, no network access required
## Requirements
- Python >= 3.10
- WeChat running locally (for `init` key extraction)
## License
[Apache License 2.0](LICENSE)

232
README_CN.md Normal file
View File

@@ -0,0 +1,232 @@
# WeChat CLI
[English](README.md)
命令行工具,查询本地微信数据——聊天记录、联系人、会话、收藏等。默认 JSON 输出,专为 LLM 集成设计。
## 功能亮点
- **开箱即用** — `pip install` + `wechat-cli init`,无外部依赖
- **11 个命令** — sessions、history、search、contacts、members、stats、export、favorites、unread、new-messages、init
- **默认 JSON** — 结构化输出,方便程序解析
- **跨平台** — macOS、Windows、Linux
- **即时解密** — SQLCipher 数据库透明解密,带缓存
- **消息类型过滤** — 按文本、图片、链接、文件、视频等过滤
- **聊天统计** — 发言排行、类型分布、24 小时活跃分布
- **Markdown 导出** — 将聊天记录导出为 markdown 或纯文本
## 快速开始
### 安装
```bash
pip install wechat-cli
```
或从源码安装:
```bash
git clone https://github.com/canghe/wechat-cli.git
cd wechat-cli
pip install -e .
```
### 初始化
确保微信正在运行,然后:
```bash
# macOS/Linux: 可能需要 sudo 权限
sudo wechat-cli init
# Windows: 在有足够权限的终端中运行
wechat-cli init
```
这一步会:
1. 自动检测微信数据目录
2. 从微信进程内存中提取加密密钥
3. 将配置和密钥保存到 `~/.wechat-cli/`
完成后即可使用所有命令。
## 命令一览
### sessions — 最近会话
```bash
wechat-cli sessions # 最近 20 个会话 (JSON)
wechat-cli sessions --limit 10 # 最近 10 个
wechat-cli sessions --format text # 纯文本输出
```
### history — 聊天记录
```bash
wechat-cli history "张三" # 最近 50 条消息
wechat-cli history "张三" --limit 100 --offset 50
wechat-cli history "交流群" --start-time "2026-04-01" --end-time "2026-04-03"
wechat-cli history "张三" --type link # 只看链接/文件
wechat-cli history "张三" --format text
```
**选项:** `--limit``--offset``--start-time``--end-time``--type``--format`
### search — 搜索消息
```bash
wechat-cli search "Claude" # 全局搜索
wechat-cli search "Claude" --chat "交流群" # 指定聊天搜索
wechat-cli search "开会" --chat "群A" --chat "群B" # 多个聊天
wechat-cli search "报告" --type file # 只搜文件
```
**选项:** `--chat`(可多次指定)、`--start-time``--end-time``--limit``--offset``--type``--format`
### contacts — 联系人搜索与详情
```bash
wechat-cli contacts --query "李" # 搜索联系人
wechat-cli contacts --detail "张三" # 查看详情
wechat-cli contacts --detail "wxid_xxx" # 通过 wxid 查看
```
详情包括:昵称、备注、微信号、个性签名、头像 URL、账号类型。
### members — 群成员列表
```bash
wechat-cli members "AI交流群" # 成员列表 (JSON)
wechat-cli members "AI交流群" --format text
```
显示成员昵称、wxid、备注和群主。
### stats — 聊天统计
```bash
wechat-cli stats "AI交流群"
wechat-cli stats "张三" --start-time "2026-04-01" --end-time "2026-04-03"
wechat-cli stats "AI交流群" --format text
```
返回:消息总数、类型分布、发言 Top 10、24 小时活跃分布(含柱状图)。
### export — 导出聊天记录
```bash
wechat-cli export "张三" --format markdown # 输出到 stdout
wechat-cli export "张三" --format txt --output chat.txt # 输出到文件
wechat-cli export "群聊" --start-time "2026-04-01" --limit 1000
```
**选项:** `--format markdown|txt``--output``--start-time``--end-time``--limit`
### favorites — 微信收藏
```bash
wechat-cli favorites # 最近收藏
wechat-cli favorites --type article # 只看文章
wechat-cli favorites --query "计算机网络" # 搜索收藏
```
**类型:** text、image、article、card、video
### unread — 未读会话
```bash
wechat-cli unread # 所有未读会话
wechat-cli unread --limit 10 --format text
```
### new-messages — 增量新消息
```bash
wechat-cli new-messages # 首次: 返回未读消息 + 保存状态
wechat-cli new-messages # 后续: 仅返回上次以来的新消息
```
状态保存在 `~/.wechat-cli/last_check.json`,删除此文件可重置。
## 消息类型过滤
`--type` 选项(适用于 `history``search`
| 值 | 说明 |
|---|------|
| `text` | 文本消息 |
| `image` | 图片 |
| `voice` | 语音 |
| `video` | 视频 |
| `sticker` | 表情 |
| `location` | 位置 |
| `link` | 链接/应用消息 |
| `file` | 文件 |
| `call` | 音视频通话 |
| `system` | 系统消息 |
## 使用场景
### AI 工具集成
```bash
# 供 Claude Code、Cursor 等 AI 工具调用
wechat-cli sessions --limit 5
wechat-cli history "张三" --limit 20 --format text
wechat-cli search "截止日期" --chat "项目组" --type text
```
所有命令默认输出 JSON适合 AI Agent 工具调用。
### 聊天分析
```bash
# 群里谁最活跃?
wechat-cli stats "项目组" --format text
# 查看所有分享的链接
wechat-cli history "张三" --type link --limit 50
# 搜索特定文件
wechat-cli search "报告.xlsx" --type file
```
### 数据备份
```bash
wechat-cli export "项目组" --format markdown --output project.md
wechat-cli export "张三" --start-time "2026-01-01" --format txt --output chat.txt
```
### 消息监控
```bash
# 定时检查新消息
*/5 * * * * wechat-cli new-messages --format text
```
## 平台支持
| 平台 | 状态 | 说明 |
|------|------|------|
| macOS (Apple Silicon) | 支持 | 内置 arm64 二进制用于密钥提取 |
| macOS (Intel) | 支持 | 需要 x86_64 二进制 |
| Windows | 支持 | 读取 Weixin.exe 进程内存 |
| Linux | 支持 | 读取 /proc/pid/mem需要 root |
## 工作原理
微信将聊天数据存储在本地的 SQLCipher 加密 SQLite 数据库中。WeChat CLI
1. **提取密钥** — 扫描微信进程内存获取加密密钥(`wechat-cli init`
2. **即时解密** — 透明解密数据库,使用页级 AES-256-CBC + 缓存
3. **本地查询** — 所有数据留在本机,无需网络访问
## 环境要求
- Python >= 3.10
- 微信在本地运行(用于 `init` 密钥提取)
## 开源协议
[Apache License 2.0](LICENSE)

23
pyproject.toml Normal file
View File

@@ -0,0 +1,23 @@
[build-system]
requires = ["setuptools>=68.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "wechat-cli"
version = "0.2.0"
description = "WeChat data query CLI for LLMs"
requires-python = ">=3.10"
dependencies = [
"click>=8.1,<9",
"pycryptodome>=3.19,<4",
"zstandard>=0.22,<1",
]
[project.scripts]
wechat-cli = "wechat_cli.main:cli"
[tool.setuptools.packages.find]
where = ["."]
[tool.setuptools.package-data]
wechat_cli = ["bin/*"]

0
wechat_cli/__init__.py Normal file
View File

Binary file not shown.

View File

@@ -0,0 +1,319 @@
/*
* find_all_keys_macos.c - macOS WeChat memory key scanner
*
* Scans WeChat process memory for SQLCipher encryption keys in the
* x'<key_hex><salt_hex>' format used by WeChat 4.x on macOS.
*
* Prerequisites:
* - WeChat must be ad-hoc signed (or SIP disabled)
* - Must run as root (sudo)
*
* Build:
* cc -O2 -o find_all_keys_macos find_all_keys_macos.c -framework Foundation
*
* Usage:
* sudo ./find_all_keys_macos [pid]
* If pid is omitted, automatically finds WeChat PID.
*
* Output: JSON file at ./all_keys.json (compatible with decrypt_db.py)
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <dirent.h>
#include <ftw.h>
#include <pwd.h>
#include <sys/stat.h>
#include <mach/mach.h>
#include <mach/mach_vm.h>
#define MAX_KEYS 256
#define KEY_SIZE 32
#define SALT_SIZE 16
#define HEX_PATTERN_LEN 96 /* 64 hex (key) + 32 hex (salt) */
#define CHUNK_SIZE (2 * 1024 * 1024)
typedef struct {
char key_hex[65];
char salt_hex[33];
char full_pragma[100];
} key_entry_t;
/* Forward declaration */
static int read_db_salt(const char *path, char *salt_hex_out);
/* nftw callback state for collecting DB files */
#define MAX_DBS 256
static char g_db_salts[MAX_DBS][33];
static char g_db_names[MAX_DBS][256];
static int g_db_count = 0;
static int nftw_collect_db(const char *fpath, const struct stat *sb,
int typeflag, struct FTW *ftwbuf) {
(void)sb; (void)ftwbuf;
if (typeflag != FTW_F) return 0;
size_t len = strlen(fpath);
if (len < 3 || strcmp(fpath + len - 3, ".db") != 0) return 0;
if (g_db_count >= MAX_DBS) return 0;
char salt[33];
if (read_db_salt(fpath, salt) != 0) return 0;
strcpy(g_db_salts[g_db_count], salt);
/* Extract relative path from db_storage/ */
const char *rel = strstr(fpath, "db_storage/");
if (rel) rel += strlen("db_storage/");
else {
rel = strrchr(fpath, '/');
rel = rel ? rel + 1 : fpath;
}
strncpy(g_db_names[g_db_count], rel, 255);
g_db_names[g_db_count][255] = '\0';
printf(" %s: salt=%s\n", g_db_names[g_db_count], salt);
g_db_count++;
return 0;
}
static int is_hex_char(unsigned char c) {
return (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F');
}
static pid_t find_wechat_pid(void) {
FILE *fp = popen("pgrep -x WeChat", "r");
if (!fp) return -1;
char buf[64];
pid_t pid = -1;
if (fgets(buf, sizeof(buf), fp))
pid = atoi(buf);
pclose(fp);
return pid;
}
/* Read DB salt (first 16 bytes) and return hex string */
static int read_db_salt(const char *path, char *salt_hex_out) {
FILE *f = fopen(path, "rb");
if (!f) return -1;
unsigned char header[16];
if (fread(header, 1, 16, f) != 16) { fclose(f); return -1; }
fclose(f);
/* Check if unencrypted */
if (memcmp(header, "SQLite format 3", 15) == 0) return -1;
for (int i = 0; i < 16; i++)
sprintf(salt_hex_out + i * 2, "%02x", header[i]);
salt_hex_out[32] = '\0';
return 0;
}
int main(int argc, char *argv[]) {
pid_t pid;
if (argc >= 2)
pid = atoi(argv[1]);
else
pid = find_wechat_pid();
if (pid <= 0) {
fprintf(stderr, "WeChat not running or invalid PID\n");
return 1;
}
printf("============================================================\n");
printf(" macOS WeChat Memory Key Scanner (C version)\n");
printf("============================================================\n");
printf("WeChat PID: %d\n", pid);
/* Get task port */
mach_port_t task;
kern_return_t kr = task_for_pid(mach_task_self(), pid, &task);
if (kr != KERN_SUCCESS) {
fprintf(stderr, "task_for_pid failed: %d\n", kr);
fprintf(stderr, "Make sure: (1) running as root, (2) WeChat is ad-hoc signed\n");
return 1;
}
printf("Got task port: %u\n", task);
/* Resolve real user's HOME (sudo may change HOME to /var/root) */
const char *home = getenv("HOME");
const char *sudo_user = getenv("SUDO_USER");
if (sudo_user) {
struct passwd *pw = getpwnam(sudo_user);
if (pw && pw->pw_dir)
home = pw->pw_dir;
}
if (!home) home = "/root";
printf("User home: %s\n", home);
/* Collect DB salts by recursively walking db_storage directories.
* Note: POSIX glob() does not support ** recursive matching on macOS,
* so we use nftw() to walk the directory tree instead. */
printf("\nScanning for DB files...\n");
char db_base_dir[512];
snprintf(db_base_dir, sizeof(db_base_dir),
"%s/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files",
home);
/* Walk each account's db_storage directory */
DIR *xdir = opendir(db_base_dir);
if (xdir) {
struct dirent *ent;
while ((ent = readdir(xdir)) != NULL) {
if (ent->d_name[0] == '.') continue;
char storage_path[768];
snprintf(storage_path, sizeof(storage_path),
"%s/%s/db_storage", db_base_dir, ent->d_name);
struct stat st;
if (stat(storage_path, &st) == 0 && S_ISDIR(st.st_mode)) {
nftw(storage_path, nftw_collect_db, 20, FTW_PHYS);
}
}
closedir(xdir);
}
printf("Found %d encrypted DBs\n", g_db_count);
/* Scan memory for x' patterns */
printf("\nScanning memory for keys...\n");
key_entry_t keys[MAX_KEYS];
int key_count = 0;
size_t total_scanned = 0;
int region_count = 0;
mach_vm_address_t addr = 0;
while (1) {
mach_vm_size_t size = 0;
vm_region_basic_info_data_64_t info;
mach_msg_type_number_t info_count = VM_REGION_BASIC_INFO_COUNT_64;
mach_port_t obj_name;
kr = mach_vm_region(task, &addr, &size, VM_REGION_BASIC_INFO_64,
(vm_region_info_t)&info, &info_count, &obj_name);
if (kr != KERN_SUCCESS) break;
if (size == 0) { addr++; continue; } /* guard against infinite loop */
if ((info.protection & (VM_PROT_READ | VM_PROT_WRITE)) ==
(VM_PROT_READ | VM_PROT_WRITE)) {
region_count++;
mach_vm_address_t ca = addr;
while (ca < addr + size) {
mach_vm_size_t cs = addr + size - ca;
if (cs > CHUNK_SIZE) cs = CHUNK_SIZE;
vm_offset_t data;
mach_msg_type_number_t dc;
kr = mach_vm_read(task, ca, cs, &data, &dc);
if (kr == KERN_SUCCESS) {
unsigned char *buf = (unsigned char *)data;
total_scanned += dc;
for (size_t i = 0; i + HEX_PATTERN_LEN + 3 < dc; i++) {
if (buf[i] == 'x' && buf[i + 1] == '\'') {
/* Check if followed by 96 hex chars and closing ' */
int valid = 1;
for (int j = 0; j < HEX_PATTERN_LEN; j++) {
if (!is_hex_char(buf[i + 2 + j])) { valid = 0; break; }
}
if (!valid) continue;
if (buf[i + 2 + HEX_PATTERN_LEN] != '\'') continue;
/* Extract key and salt hex */
char key_hex[65], salt_hex[33];
memcpy(key_hex, buf + i + 2, 64);
key_hex[64] = '\0';
memcpy(salt_hex, buf + i + 2 + 64, 32);
salt_hex[32] = '\0';
/* Convert to lowercase for comparison */
for (int j = 0; key_hex[j]; j++)
if (key_hex[j] >= 'A' && key_hex[j] <= 'F')
key_hex[j] += 32;
for (int j = 0; salt_hex[j]; j++)
if (salt_hex[j] >= 'A' && salt_hex[j] <= 'F')
salt_hex[j] += 32;
/* Deduplicate */
int dup = 0;
for (int k = 0; k < key_count; k++) {
if (strcmp(keys[k].key_hex, key_hex) == 0 &&
strcmp(keys[k].salt_hex, salt_hex) == 0) {
dup = 1; break;
}
}
if (dup) continue;
if (key_count < MAX_KEYS) {
strcpy(keys[key_count].key_hex, key_hex);
strcpy(keys[key_count].salt_hex, salt_hex);
snprintf(keys[key_count].full_pragma, sizeof(keys[key_count].full_pragma),
"x'%s%s'", key_hex, salt_hex);
key_count++;
}
}
}
mach_vm_deallocate(mach_task_self(), data, dc);
}
/* Advance with overlap to catch patterns spanning chunk boundaries.
* Pattern is x'<96 hex chars>' = 99 bytes total. */
if (cs > HEX_PATTERN_LEN + 3)
ca += cs - (HEX_PATTERN_LEN + 3);
else
ca += cs;
}
}
addr += size;
}
printf("\nScan complete: %zuMB scanned, %d regions, %d unique keys\n",
total_scanned / 1024 / 1024, region_count, key_count);
/* Match keys to DBs */
printf("\n%-25s %-66s %s\n", "Database", "Key", "Salt");
printf("%-25s %-66s %s\n",
"-------------------------",
"------------------------------------------------------------------",
"--------------------------------");
int matched = 0;
for (int i = 0; i < key_count; i++) {
const char *db = NULL;
for (int j = 0; j < g_db_count; j++) {
if (strcmp(keys[i].salt_hex, g_db_salts[j]) == 0) {
db = g_db_names[j];
matched++;
break;
}
}
printf("%-25s %-66s %s\n",
db ? db : "(unknown)",
keys[i].key_hex,
keys[i].salt_hex);
}
printf("\nMatched %d/%d keys to known DBs\n", matched, key_count);
/* Save JSON: { "rel/path.db": { "enc_key": "hex" }, ... }
* Uses forward slashes (native macOS paths, valid JSON without escaping).
*/
const char *out_path = "all_keys.json";
FILE *fp = fopen(out_path, "w");
if (fp) {
fprintf(fp, "{\n");
int first = 1;
for (int i = 0; i < key_count; i++) {
const char *db = NULL;
for (int j = 0; j < g_db_count; j++) {
if (strcmp(keys[i].salt_hex, g_db_salts[j]) == 0) {
db = g_db_names[j];
break;
}
}
if (!db) continue;
fprintf(fp, "%s \"%s\": {\"enc_key\": \"%s\"}",
first ? "" : ",\n", db, keys[i].key_hex);
first = 0;
}
fprintf(fp, "\n}\n");
fclose(fp);
printf("Saved to %s\n", out_path);
}
return 0;
}

View File

View File

@@ -0,0 +1,91 @@
"""contacts 命令 — 搜索或查看联系人"""
import click
from ..core.contacts import get_contact_full, get_contact_names, resolve_username, get_contact_detail
from ..output.formatter import output
@click.command("contacts")
@click.option("--query", default="", help="搜索关键词匹配昵称、备注、wxid")
@click.option("--detail", default=None, help="查看联系人详情(传入昵称/备注/wxid")
@click.option("--limit", default=50, help="返回数量")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def contacts(ctx, query, detail, limit, fmt):
"""搜索或列出联系人
\b
示例:
wechat-cli contacts --query "" # 搜索联系人
wechat-cli contacts --detail "张三" # 查看联系人详情
wechat-cli contacts --detail "wxid_xxx" # 通过 wxid 查看
"""
app = ctx.obj
if detail:
_show_detail(app, detail, fmt)
return
names = get_contact_names(app.cache, app.decrypted_dir)
full = get_contact_full(app.cache, app.decrypted_dir)
if query:
q_lower = query.lower()
matched = [c for c in full if q_lower in c.get('nick_name', '').lower()
or q_lower in c.get('remark', '').lower()
or q_lower in c.get('username', '').lower()]
else:
matched = full
matched = matched[:limit]
if fmt == 'json':
output(matched, 'json')
else:
header = f"找到 {len(matched)} 个联系人:"
lines = []
for c in matched:
display = c['remark'] or c['nick_name'] or c['username']
line = f"{display} ({c['username']})"
if c['remark']:
line += f" 备注: {c['remark']}"
lines.append(line)
output(header + "\n\n" + "\n".join(lines), 'text')
def _show_detail(app, name_or_id, fmt):
"""显示联系人详情。"""
names = get_contact_names(app.cache, app.decrypted_dir)
# 尝试解析为 username
username = resolve_username(name_or_id, app.cache, app.decrypted_dir)
if not username:
# 直接用原始输入试试
username = name_or_id
info = get_contact_detail(username, app.cache, app.decrypted_dir)
if not info:
click.echo(f"找不到联系人: {name_or_id}", err=True)
return
if fmt == 'json':
output(info, 'json')
else:
lines = [f"联系人详情: {info['nick_name']}"]
if info['remark']:
lines.append(f"备注: {info['remark']}")
if info['alias']:
lines.append(f"微信号: {info['alias']}")
lines.append(f"wxid: {info['username']}")
if info['description']:
lines.append(f"个性签名: {info['description']}")
if info['is_group']:
lines.append("类型: 群聊")
elif info['is_subscription']:
lines.append("类型: 公众号")
elif info['verify_flag'] and info['verify_flag'] >= 8:
lines.append("类型: 企业认证")
if info['avatar']:
lines.append(f"头像: {info['avatar']}")
output("\n".join(lines), 'text')

View File

@@ -0,0 +1,101 @@
"""export 命令 — 导出聊天记录为 markdown 或 txt"""
import click
from datetime import datetime
from ..core.contacts import get_contact_names
from ..core.messages import (
collect_chat_history,
parse_time_range,
resolve_chat_context,
validate_pagination,
)
from ..output.formatter import output
@click.command("export")
@click.argument("chat_name")
@click.option("--format", "fmt", default="markdown", type=click.Choice(["markdown", "txt"]), help="导出格式")
@click.option("--output", "output_path", default=None, help="输出文件路径(默认输出到 stdout")
@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--limit", default=500, help="导出消息数量")
@click.pass_context
def export(ctx, chat_name, fmt, output_path, start_time, end_time, limit):
"""导出聊天记录为 markdown 或纯文本
\b
示例:
wechat-cli export "张三" --format markdown
wechat-cli export "AI交流群" --format txt --output chat.txt
wechat-cli export "张三" --start-time "2026-04-01" --limit 1000
"""
app = ctx.obj
try:
validate_pagination(limit, 0, limit_max=None)
start_ts, end_ts = parse_time_range(start_time, end_time)
except ValueError as e:
click.echo(f"错误: {e}", err=True)
ctx.exit(2)
chat_ctx = resolve_chat_context(chat_name, app.msg_db_keys, app.cache, app.decrypted_dir)
if not chat_ctx:
click.echo(f"找不到聊天对象: {chat_name}", err=True)
ctx.exit(1)
if not chat_ctx['db_path']:
click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True)
ctx.exit(1)
names = get_contact_names(app.cache, app.decrypted_dir)
lines, failures = collect_chat_history(
chat_ctx, names, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, limit=limit, offset=0,
)
if not lines:
click.echo(f"{chat_ctx['display_name']} 无消息记录", err=True)
ctx.exit(0)
now = datetime.now().strftime('%Y-%m-%d %H:%M')
chat_type = "群聊" if chat_ctx['is_group'] else "私聊"
time_range = f"{start_time or '最早'} ~ {end_time or '最新'}"
if fmt == 'markdown':
content = _format_markdown(chat_ctx['display_name'], chat_type, time_range, now, lines)
else:
content = _format_txt(chat_ctx['display_name'], chat_type, time_range, now, lines)
if output_path:
with open(output_path, 'w', encoding='utf-8') as f:
f.write(content)
if not content.endswith('\n'):
f.write('\n')
click.echo(f"已导出到: {output_path}{len(lines)} 条消息)", err=True)
else:
output(content, 'text')
def _format_markdown(display_name, chat_type, time_range, export_time, lines):
header = (
f"# 聊天记录: {display_name}\n\n"
f"**时间范围:** {time_range}\n\n"
f"**导出时间:** {export_time}\n\n"
f"**消息数量:** {len(lines)}\n\n"
f"**类型:** {chat_type}\n\n---\n"
)
body = "\n".join(f"- {line}" for line in lines)
return header + body
def _format_txt(display_name, chat_type, time_range, export_time, lines):
header = (
f"聊天记录: {display_name}\n"
f"类型: {chat_type}\n"
f"时间范围: {time_range}\n"
f"导出时间: {export_time}\n"
f"消息数量: {len(lines)}\n"
f"{'=' * 60}"
)
body = "\n".join(lines)
return header + "\n" + body

View File

@@ -0,0 +1,142 @@
"""favorites 命令 — 查看微信收藏"""
import os
import sqlite3
import xml.etree.ElementTree as ET
from contextlib import closing
from datetime import datetime
import click
from ..core.contacts import get_contact_names
from ..output.formatter import output
_FAV_TYPE_MAP = {
1: '文本', 2: '图片', 5: '文章', 19: '名片', 20: '视频号',
}
_FAV_TYPE_FILTERS = {
'text': 1, 'image': 2, 'article': 5, 'card': 19, 'video': 20,
}
def _parse_fav_content(content, fav_type):
"""从 XML content 提取摘要信息。"""
if not content:
return ''
try:
root = ET.fromstring(content)
except ET.ParseError:
return ''
item = root if root.tag == 'favitem' else root.find('.//favitem')
if item is None:
return ''
if fav_type == 1:
return (item.findtext('desc') or '').strip()
if fav_type == 2:
return '[图片收藏]'
if fav_type == 5:
title = (item.findtext('.//pagetitle') or '').strip()
desc = (item.findtext('.//pagedesc') or '').strip()
return f"{title} - {desc}" if desc else title
if fav_type == 19:
return (item.findtext('desc') or '').strip()
if fav_type == 20:
nickname = (item.findtext('.//nickname') or '').strip()
desc = (item.findtext('.//desc') or '').strip()
parts = [p for p in [nickname, desc] if p]
return ' '.join(parts) if parts else '[视频号]'
desc = (item.findtext('desc') or '').strip()
return desc if desc else '[收藏]'
@click.command("favorites")
@click.option("--limit", default=20, help="返回数量")
@click.option("--type", "fav_type", default=None,
type=click.Choice(list(_FAV_TYPE_FILTERS.keys())),
help="按类型过滤: text/image/article/card/video")
@click.option("--query", default=None, help="关键词搜索")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def favorites(ctx, limit, fav_type, query, fmt):
"""查看微信收藏
\b
示例:
wechat-cli favorites # 最近收藏
wechat-cli favorites --type article # 只看文章
wechat-cli favorites --query "计算机网络" # 搜索收藏
wechat-cli favorites --limit 5 --format text
"""
app = ctx.obj
# 查找 favorite.db
fav_path = None
pre_decrypted = os.path.join(app.decrypted_dir, "favorite", "favorite.db")
if os.path.exists(pre_decrypted):
fav_path = pre_decrypted
else:
fav_path = app.cache.get(os.path.join("favorite", "favorite.db"))
if not fav_path:
click.echo("错误: 无法访问 favorite.db", err=True)
ctx.exit(3)
names = get_contact_names(app.cache, app.decrypted_dir)
with closing(sqlite3.connect(fav_path)) as conn:
where_parts = []
params = []
if fav_type:
where_parts.append('type = ?')
params.append(_FAV_TYPE_FILTERS[fav_type])
if query:
where_parts.append('content LIKE ?')
params.append(f'%{query}%')
where_sql = f"WHERE {' AND '.join(where_parts)}" if where_parts else ''
rows = conn.execute(f"""
SELECT local_id, type, update_time, content, fromusr, realchatname
FROM fav_db_item
{where_sql}
ORDER BY update_time DESC
LIMIT ?
""", (*params, limit)).fetchall()
results = []
for local_id, typ, ts, content, fromusr, realchat in rows:
from_display = names.get(fromusr, fromusr) if fromusr else ''
chat_display = names.get(realchat, realchat) if realchat else ''
summary = _parse_fav_content(content, typ)
results.append({
'id': local_id,
'type': _FAV_TYPE_MAP.get(typ, f'type={typ}'),
'time': datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M'),
'summary': summary,
'from': from_display,
'source_chat': chat_display,
})
if fmt == 'json':
output({
'count': len(results),
'favorites': results,
}, 'json')
else:
if not results:
output("没有找到收藏", 'text')
return
lines = []
for r in results:
entry = f"[{r['time']}] [{r['type']}] {r['summary']}"
if r['from']:
entry += f"\n 来自: {r['from']}"
if r['source_chat']:
entry += f" 聊天: {r['source_chat']}"
lines.append(entry)
output(f"收藏列表({len(results)} 条):\n\n" + "\n\n".join(lines), 'text')

View File

@@ -0,0 +1,86 @@
"""get-chat-history 命令"""
import click
from ..core.contacts import get_contact_names
from ..core.messages import (
MSG_TYPE_FILTERS,
MSG_TYPE_NAMES,
collect_chat_history,
parse_time_range,
resolve_chat_context,
validate_pagination,
)
from ..output.formatter import output
@click.command("history")
@click.argument("chat_name")
@click.option("--limit", default=50, help="返回的消息数量")
@click.option("--offset", default=0, help="分页偏移量")
@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤")
@click.pass_context
def history(ctx, chat_name, limit, offset, start_time, end_time, fmt, msg_type):
"""获取指定聊天的消息记录
\b
示例:
wechat-cli history "张三" # 最近 50 条消息
wechat-cli history "张三" --limit 100 --offset 50 # 分页查询
wechat-cli history "AI交流群" --start-time "2026-04-01" --end-time "2026-04-02"
wechat-cli history "张三" --format text # 纯文本输出
"""
app = ctx.obj
try:
validate_pagination(limit, offset, limit_max=None)
start_ts, end_ts = parse_time_range(start_time, end_time)
except ValueError as e:
click.echo(f"错误: {e}", err=True)
ctx.exit(2)
chat_ctx = resolve_chat_context(chat_name, app.msg_db_keys, app.cache, app.decrypted_dir)
if not chat_ctx:
click.echo(f"找不到聊天对象: {chat_name}", err=True)
ctx.exit(1)
if not chat_ctx['db_path']:
click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True)
ctx.exit(1)
names = get_contact_names(app.cache, app.decrypted_dir)
type_filter = MSG_TYPE_FILTERS[msg_type] if msg_type else None
lines, failures = collect_chat_history(
chat_ctx, names, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, limit=limit, offset=offset,
msg_type_filter=type_filter,
)
if fmt == 'json':
output({
'chat': chat_ctx['display_name'],
'username': chat_ctx['username'],
'is_group': chat_ctx['is_group'],
'count': len(lines),
'offset': offset,
'limit': limit,
'start_time': start_time or None,
'end_time': end_time or None,
'type': msg_type or None,
'messages': lines,
'failures': failures if failures else None,
}, 'json')
else:
header = f"{chat_ctx['display_name']} 的消息记录(返回 {len(lines)}offset={offset}, limit={limit}"
if chat_ctx['is_group']:
header += " [群聊]"
if start_time or end_time:
header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}"
if failures:
header += "\n查询失败: " + "".join(failures)
if lines:
output(header + ":\n\n" + "\n".join(lines), 'text')
else:
output(f"{chat_ctx['display_name']} 无消息记录", 'text')

View File

@@ -0,0 +1,72 @@
"""init 命令 — 交互式初始化,提取密钥并生成配置"""
import json
import os
import sys
import click
from ..core.config import STATE_DIR, CONFIG_FILE, KEYS_FILE, auto_detect_db_dir
@click.command()
@click.option("--db-dir", default=None, help="微信数据目录路径(默认自动检测)")
@click.option("--force", is_flag=True, help="强制重新提取密钥")
def init(db_dir, force):
"""初始化 wechat-cli提取密钥并生成配置"""
click.echo("WeChat CLI 初始化")
click.echo("=" * 40)
# 1. 检查是否已初始化
if os.path.exists(CONFIG_FILE) and os.path.exists(KEYS_FILE) and not force:
click.echo(f"已初始化(配置: {CONFIG_FILE}")
click.echo("使用 --force 重新提取密钥")
return
# 2. 创建状态目录
os.makedirs(STATE_DIR, exist_ok=True)
# 3. 确定 db_dir
if db_dir is None:
db_dir = auto_detect_db_dir()
if db_dir is None:
click.echo("[!] 未能自动检测到微信数据目录", err=True)
click.echo("请通过 --db-dir 参数指定,例如:", err=True)
click.echo(" wechat-cli init --db-dir ~/path/to/db_storage", err=True)
sys.exit(1)
click.echo(f"[+] 检测到微信数据目录: {db_dir}")
else:
db_dir = os.path.abspath(db_dir)
if not os.path.isdir(db_dir):
click.echo(f"[!] 目录不存在: {db_dir}", err=True)
sys.exit(1)
click.echo(f"[+] 使用指定数据目录: {db_dir}")
# 4. 提取密钥
click.echo("\n开始提取密钥...")
try:
from ..keys import extract_keys
key_map = extract_keys(db_dir, KEYS_FILE)
except RuntimeError as e:
click.echo(f"\n[!] 密钥提取失败: {e}", err=True)
if "sudo" not in str(e).lower():
click.echo("提示: macOS/Linux 可能需要 sudo 权限", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"\n[!] 密钥提取出错: {e}", err=True)
sys.exit(1)
# 5. 写入配置
cfg = {
"db_dir": db_dir,
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
click.echo(f"\n[+] 初始化完成!")
click.echo(f" 配置: {CONFIG_FILE}")
click.echo(f" 密钥: {KEYS_FILE}")
click.echo(f" 提取到 {len(key_map)} 个数据库密钥")
click.echo("\n现在可以使用:")
click.echo(" wechat-cli sessions")
click.echo(" wechat-cli history \"联系人\"")

View File

@@ -0,0 +1,52 @@
"""members 命令 — 查询群聊成员列表"""
import click
from ..core.contacts import get_contact_names, resolve_username, get_group_members
from ..output.formatter import output
@click.command("members")
@click.argument("group_name")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def members(ctx, group_name, fmt):
"""查询群聊成员列表
\b
示例:
wechat-cli members "AI交流群"
wechat-cli members "群名" --format text
"""
app = ctx.obj
username = resolve_username(group_name, app.cache, app.decrypted_dir)
if not username:
click.echo(f"找不到: {group_name}", err=True)
ctx.exit(1)
if '@chatroom' not in username:
click.echo(f"{group_name} 不是一个群聊", err=True)
ctx.exit(1)
names = get_contact_names(app.cache, app.decrypted_dir)
display_name = names.get(username, username)
result = get_group_members(username, app.cache, app.decrypted_dir)
if fmt == 'json':
output({
'group': display_name,
'username': username,
'member_count': len(result['members']),
'owner': result['owner'],
'members': result['members'],
}, 'json')
else:
lines = [f"{m['display_name']} ({m['username']})"]
if m['remark']:
lines[-1] += f" 备注: {m['remark']}"
header = f"{display_name} 的群成员(共 {len(result['members'])} 人)"
if result['owner']:
header += f",群主: {result['owner']}"
output(header + ":\n\n" + "\n".join(lines), 'text')

View File

@@ -0,0 +1,163 @@
"""get-new-messages 命令 — 增量消息查询,状态持久化到磁盘"""
import json
import os
import sqlite3
from contextlib import closing
from datetime import datetime
import click
from ..core.config import STATE_DIR
from ..core.contacts import get_contact_names
from ..core.messages import decompress_content, format_msg_type
from ..output.formatter import output
STATE_FILE = os.path.join(STATE_DIR, "last_check.json")
def _load_last_state():
if not os.path.exists(STATE_FILE):
return {}
try:
with open(STATE_FILE, encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return {}
def _save_last_state(state):
os.makedirs(STATE_DIR, exist_ok=True)
with open(STATE_FILE, 'w', encoding="utf-8") as f:
json.dump(state, f)
@click.command("new-messages")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def new_messages(ctx, fmt):
"""获取自上次调用以来的新消息
\b
示例:
wechat-cli new-messages # 首次: 返回未读消息并记录状态
wechat-cli new-messages # 再次: 仅返回新增消息
wechat-cli new-messages --format text # 纯文本输出
\b
状态文件: ~/.wechat-cli/last_check.json (删除此文件可重置)
"""
app = ctx.obj
path = app.cache.get(os.path.join("session", "session.db"))
if not path:
click.echo("错误: 无法解密 session.db", err=True)
ctx.exit(3)
names = get_contact_names(app.cache, app.decrypted_dir)
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute("""
SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable
WHERE last_timestamp > 0
ORDER BY last_timestamp DESC
""").fetchall()
curr_state = {}
for r in rows:
username, unread, summary, ts, msg_type, sender, sender_name = r
curr_state[username] = {
'unread': unread, 'summary': summary, 'timestamp': ts,
'msg_type': msg_type, 'sender': sender or '', 'sender_name': sender_name or '',
}
last_state = _load_last_state()
if not last_state:
# 首次调用:保存状态,返回未读
_save_last_state({u: s['timestamp'] for u, s in curr_state.items()})
unread_msgs = []
for username, s in curr_state.items():
if s['unread'] and s['unread'] > 0:
display = names.get(username, username)
is_group = '@chatroom' in username
summary = s['summary']
if isinstance(summary, bytes):
summary = decompress_content(summary, 4) or '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
time_str = datetime.fromtimestamp(s['timestamp']).strftime('%H:%M')
unread_msgs.append({
'chat': display,
'username': username,
'is_group': is_group,
'unread': s['unread'],
'last_message': str(summary or ''),
'msg_type': format_msg_type(s['msg_type']),
'time': time_str,
'timestamp': s['timestamp'],
})
if fmt == 'json':
output({'first_call': True, 'unread_count': len(unread_msgs), 'messages': unread_msgs}, 'json')
else:
if unread_msgs:
lines = []
for m in unread_msgs:
tag = " [群]" if m['is_group'] else ""
lines.append(f"[{m['time']}] {m['chat']}{tag} ({m['unread']}条未读): {m['last_message']}")
output(f"当前 {len(unread_msgs)} 个未读会话:\n\n" + "\n".join(lines), 'text')
else:
output("当前无未读消息(已记录状态,下次调用将返回新消息)", 'text')
return
# 后续调用:对比差异
new_msgs = []
for username, s in curr_state.items():
prev_ts = last_state.get(username, 0)
if s['timestamp'] > prev_ts:
display = names.get(username, username)
is_group = '@chatroom' in username
summary = s['summary']
if isinstance(summary, bytes):
summary = decompress_content(summary, 4) or '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
sender_display = ''
if is_group and s['sender']:
sender_display = names.get(s['sender'], s['sender_name'] or s['sender'])
new_msgs.append({
'chat': display,
'username': username,
'is_group': is_group,
'last_message': str(summary or ''),
'msg_type': format_msg_type(s['msg_type']),
'sender': sender_display,
'time': datetime.fromtimestamp(s['timestamp']).strftime('%H:%M:%S'),
'timestamp': s['timestamp'],
})
_save_last_state({u: s['timestamp'] for u, s in curr_state.items()})
new_msgs.sort(key=lambda m: m['timestamp'])
if fmt == 'json':
output({'first_call': False, 'new_count': len(new_msgs), 'messages': new_msgs}, 'json')
else:
if not new_msgs:
output("无新消息", 'text')
else:
lines = []
for m in new_msgs:
entry = f"[{m['time']}] {m['chat']}"
if m['is_group']:
entry += " [群]"
entry += f": {m['msg_type']}"
if m['sender']:
entry += f" ({m['sender']})"
entry += f" - {m['last_message']}"
lines.append(entry)
output(f"{len(new_msgs)} 条新消息:\n\n" + "\n".join(lines), 'text')

View File

@@ -0,0 +1,124 @@
"""search-messages 命令"""
import click
from ..core.contacts import get_contact_names
from ..core.messages import (
MSG_TYPE_FILTERS,
MSG_TYPE_NAMES,
collect_chat_search,
parse_time_range,
resolve_chat_context,
resolve_chat_contexts,
search_all_messages,
validate_pagination,
_candidate_page_size,
_page_ranked_entries,
)
from ..output.formatter import output
@click.command("search")
@click.argument("keyword")
@click.option("--chat", multiple=True, help="限定聊天对象(可多次指定)")
@click.option("--start-time", default="", help="起始时间")
@click.option("--end-time", default="", help="结束时间")
@click.option("--limit", default=20, help="返回数量最大500")
@click.option("--offset", default=0, help="分页偏移量")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.option("--type", "msg_type", default=None, type=click.Choice(MSG_TYPE_NAMES), help="消息类型过滤")
@click.pass_context
def search(ctx, keyword, chat, start_time, end_time, limit, offset, fmt, msg_type):
"""搜索消息内容
\b
示例:
wechat-cli search "Claude" # 全局搜索
wechat-cli search "Claude" --chat "AI交流群" # 在指定群搜索
wechat-cli search "开会" --chat "群A" --chat "群B" # 同时搜多个群
wechat-cli search "你好" --start-time "2026-04-01" --limit 50
"""
app = ctx.obj
try:
validate_pagination(limit, offset)
start_ts, end_ts = parse_time_range(start_time, end_time)
except ValueError as e:
click.echo(f"错误: {e}", err=True)
ctx.exit(2)
names = get_contact_names(app.cache, app.decrypted_dir)
candidate_limit = _candidate_page_size(limit, offset)
chat_names = list(chat)
type_filter = MSG_TYPE_FILTERS[msg_type] if msg_type else None
if len(chat_names) == 1:
# 单聊搜索
chat_ctx = resolve_chat_context(chat_names[0], app.msg_db_keys, app.cache, app.decrypted_dir)
if not chat_ctx:
click.echo(f"找不到聊天对象: {chat_names[0]}", err=True)
ctx.exit(1)
if not chat_ctx['db_path']:
click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True)
ctx.exit(1)
entries, failures = collect_chat_search(
chat_ctx, names, keyword, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=type_filter,
)
scope = chat_ctx['display_name']
elif len(chat_names) > 1:
# 多聊搜索
resolved, unresolved, missing = resolve_chat_contexts(chat_names, app.msg_db_keys, app.cache, app.decrypted_dir)
if not resolved:
click.echo("错误: 没有可查询的聊天对象", err=True)
ctx.exit(1)
entries = []
failures = []
for rc in resolved:
e, f = collect_chat_search(
rc, names, keyword, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=type_filter,
)
entries.extend(e)
failures.extend(f)
if unresolved:
failures.append("未找到: " + "".join(unresolved))
scope = f"{len(resolved)} 个聊天对象"
else:
# 全局搜索
entries, failures = search_all_messages(
app.msg_db_keys, app.cache, names, keyword, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=type_filter,
)
scope = "全部消息"
paged = _page_ranked_entries(entries, limit, offset)
if fmt == 'json':
output({
'scope': scope,
'keyword': keyword,
'count': len(paged),
'offset': offset,
'limit': limit,
'start_time': start_time or None,
'end_time': end_time or None,
'type': msg_type or None,
'results': [item[1] for item in paged],
'failures': failures if failures else None,
}, 'json')
else:
if not paged:
output(f"{scope} 中未找到包含 \"{keyword}\" 的消息", 'text')
return
header = f"{scope} 中搜索 \"{keyword}\" 找到 {len(paged)} 条结果offset={offset}, limit={limit}"
if start_time or end_time:
header += f"\n时间范围: {start_time or '最早'} ~ {end_time or '最新'}"
if failures:
header += "\n查询失败: " + "".join(failures)
output(header + ":\n\n" + "\n\n".join(item[1] for item in paged), 'text')

View File

@@ -0,0 +1,88 @@
"""get-recent-sessions 命令"""
import os
import sqlite3
from contextlib import closing
from datetime import datetime
import click
from ..core.contacts import get_contact_names
from ..core.messages import decompress_content, format_msg_type
from ..output.formatter import output
@click.command("sessions")
@click.option("--limit", default=20, help="返回的会话数量")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def sessions(ctx, limit, fmt):
"""获取最近会话列表
\b
示例:
wechat-cli sessions # 默认返回最近 20 个会话 (JSON)
wechat-cli sessions --limit 10 # 最近 10 个会话
wechat-cli sessions --format text # 纯文本输出
"""
app = ctx.obj
path = app.cache.get(os.path.join("session", "session.db"))
if not path:
click.echo("错误: 无法解密 session.db", err=True)
ctx.exit(3)
names = get_contact_names(app.cache, app.decrypted_dir)
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute("""
SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable
WHERE last_timestamp > 0
ORDER BY last_timestamp DESC
LIMIT ?
""", (limit,)).fetchall()
results = []
for r in rows:
username, unread, summary, ts, msg_type, sender, sender_name = r
display = names.get(username, username)
is_group = '@chatroom' in username
if isinstance(summary, bytes):
summary = decompress_content(summary, 4) or '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
sender_display = ''
if is_group and sender:
sender_display = names.get(sender, sender_name or sender)
results.append({
'chat': display,
'username': username,
'is_group': is_group,
'unread': unread or 0,
'last_message': str(summary or ''),
'msg_type': format_msg_type(msg_type),
'sender': sender_display,
'timestamp': ts,
'time': datetime.fromtimestamp(ts).strftime('%m-%d %H:%M'),
})
if fmt == 'json':
output(results, 'json')
else:
lines = []
for r in results:
entry = f"[{r['time']}] {r['chat']}"
if r['is_group']:
entry += " [群]"
if r['unread'] > 0:
entry += f" ({r['unread']}条未读)"
entry += f"\n {r['msg_type']}: "
if r['sender']:
entry += f"{r['sender']}: "
entry += r['last_message']
lines.append(entry)
output(f"最近 {len(results)} 个会话:\n\n" + "\n\n".join(lines), 'text')

View File

@@ -0,0 +1,88 @@
"""stats 命令 — 聊天统计分析"""
import click
from ..core.contacts import get_contact_names
from ..core.messages import (
collect_chat_stats,
parse_time_range,
resolve_chat_context,
)
from ..output.formatter import output
@click.command("stats")
@click.argument("chat_name")
@click.option("--start-time", default="", help="起始时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--end-time", default="", help="结束时间 YYYY-MM-DD [HH:MM[:SS]]")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def stats(ctx, chat_name, start_time, end_time, fmt):
"""聊天统计分析
\b
示例:
wechat-cli stats "AI交流群"
wechat-cli stats "张三" --start-time "2026-04-01" --end-time "2026-04-03"
wechat-cli stats "群名" --format text
"""
app = ctx.obj
try:
start_ts, end_ts = parse_time_range(start_time, end_time)
except ValueError as e:
click.echo(f"错误: {e}", err=True)
ctx.exit(2)
chat_ctx = resolve_chat_context(chat_name, app.msg_db_keys, app.cache, app.decrypted_dir)
if not chat_ctx:
click.echo(f"找不到聊天对象: {chat_name}", err=True)
ctx.exit(1)
if not chat_ctx['db_path']:
click.echo(f"找不到 {chat_ctx['display_name']} 的消息记录", err=True)
ctx.exit(1)
names = get_contact_names(app.cache, app.decrypted_dir)
result = collect_chat_stats(
chat_ctx, names, app.display_name_fn,
start_ts=start_ts, end_ts=end_ts,
)
if fmt == 'json':
output({
'chat': chat_ctx['display_name'],
'username': chat_ctx['username'],
'is_group': chat_ctx['is_group'],
**result,
}, 'json')
else:
lines = [f"{chat_ctx['display_name']} 聊天统计"]
if chat_ctx['is_group']:
lines[0] += " [群聊]"
lines.append(f"消息总数: {result['total']}")
if start_time or end_time:
lines.append(f"时间范围: {start_time or '最早'} ~ {end_time or '最新'}")
# 类型分布
lines.append("\n消息类型分布:")
for t, cnt in result['type_breakdown'].items():
pct = cnt / result['total'] * 100 if result['total'] > 0 else 0
lines.append(f" {t}: {cnt} ({pct:.1f}%)")
# 发送者排名
if result['top_senders']:
lines.append("\n发言排行 Top 10:")
for s in result['top_senders']:
lines.append(f" {s['name']}: {s['count']}")
# 24小时分布
lines.append("\n24小时活跃分布:")
max_count = max(result['hourly'].values()) if result['hourly'] else 0
bar_max = 30
for h in range(24):
count = result['hourly'].get(h, 0)
bar_len = int(count / max_count * bar_max) if max_count > 0 else 0
bar = '' * bar_len
lines.append(f" {h:02d}时 |{bar} {count}")
output("\n".join(lines), 'text')

View File

@@ -0,0 +1,90 @@
"""unread 命令 — 查看未读会话"""
import os
import sqlite3
from contextlib import closing
from datetime import datetime
import click
from ..core.contacts import get_contact_names
from ..core.messages import decompress_content, format_msg_type
from ..output.formatter import output
@click.command("unread")
@click.option("--limit", default=50, help="返回的会话数量")
@click.option("--format", "fmt", default="json", type=click.Choice(["json", "text"]), help="输出格式")
@click.pass_context
def unread(ctx, limit, fmt):
"""查看未读会话
\b
示例:
wechat-cli unread # 查看所有未读会话
wechat-cli unread --limit 10 # 最多显示 10 个
wechat-cli unread --format text # 纯文本输出
"""
app = ctx.obj
path = app.cache.get(os.path.join("session", "session.db"))
if not path:
click.echo("错误: 无法解密 session.db", err=True)
ctx.exit(3)
names = get_contact_names(app.cache, app.decrypted_dir)
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute("""
SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable
WHERE unread_count > 0
ORDER BY last_timestamp DESC
LIMIT ?
""", (limit,)).fetchall()
results = []
for r in rows:
username, unread, summary, ts, msg_type, sender, sender_name = r
display = names.get(username, username)
is_group = '@chatroom' in username
if isinstance(summary, bytes):
summary = decompress_content(summary, 4) or '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
sender_display = ''
if is_group and sender:
sender_display = names.get(sender, sender_name or sender)
results.append({
'chat': display,
'username': username,
'is_group': is_group,
'unread': unread or 0,
'last_message': str(summary or ''),
'msg_type': format_msg_type(msg_type),
'sender': sender_display,
'timestamp': ts,
'time': datetime.fromtimestamp(ts).strftime('%m-%d %H:%M'),
})
if fmt == 'json':
output(results, 'json')
else:
if not results:
output("没有未读消息", 'text')
return
lines = []
for r in results:
entry = f"[{r['time']}] {r['chat']}"
if r['is_group']:
entry += " [群]"
entry += f" ({r['unread']}条未读)"
entry += f"\n {r['msg_type']}: "
if r['sender']:
entry += f"{r['sender']}: "
entry += r['last_message']
lines.append(entry)
output(f"未读会话({len(results)} 个):\n\n" + "\n\n".join(lines), 'text')

View File

192
wechat_cli/core/config.py Normal file
View File

@@ -0,0 +1,192 @@
"""配置加载 — 从 ~/.wechat-cli/ 读取自包含配置"""
import glob as glob_mod
import json
import os
import platform
import sys
_SYSTEM = platform.system().lower()
if _SYSTEM == "linux":
_DEFAULT_PROCESS = "wechat"
elif _SYSTEM == "darwin":
_DEFAULT_PROCESS = "WeChat"
else:
_DEFAULT_PROCESS = "Weixin.exe"
# CLI 状态目录
STATE_DIR = os.path.expanduser("~/.wechat-cli")
CONFIG_FILE = os.path.join(STATE_DIR, "config.json")
KEYS_FILE = os.path.join(STATE_DIR, "all_keys.json")
def _choose_candidate(candidates):
if len(candidates) == 1:
return candidates[0]
if len(candidates) > 1:
if not sys.stdin.isatty():
return candidates[0]
print("[!] 检测到多个微信数据目录:")
for i, c in enumerate(candidates, 1):
print(f" {i}. {c}")
print(" 0. 跳过")
try:
while True:
choice = input(f"请选择 [0-{len(candidates)}]: ").strip()
if choice == "0":
return None
if choice.isdigit() and 1 <= int(choice) <= len(candidates):
return candidates[int(choice) - 1]
print(" 无效输入")
except (EOFError, KeyboardInterrupt):
print()
return None
return None
def _auto_detect_db_dir_windows():
appdata = os.environ.get("APPDATA", "")
config_dir = os.path.join(appdata, "Tencent", "xwechat", "config")
if not os.path.isdir(config_dir):
return None
data_roots = []
for ini_file in glob_mod.glob(os.path.join(config_dir, "*.ini")):
try:
content = None
for enc in ("utf-8", "gbk"):
try:
with open(ini_file, "r", encoding=enc) as f:
content = f.read(1024).strip()
break
except UnicodeDecodeError:
continue
if not content or any(c in content for c in "\n\r\x00"):
continue
if os.path.isdir(content):
data_roots.append(content)
except OSError:
continue
seen = set()
candidates = []
for root in data_roots:
pattern = os.path.join(root, "xwechat_files", "*", "db_storage")
for match in glob_mod.glob(pattern):
normalized = os.path.normcase(os.path.normpath(match))
if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
candidates.append(match)
return _choose_candidate(candidates)
def _auto_detect_db_dir_linux():
seen = set()
candidates = []
search_roots = [os.path.expanduser("~/Documents/xwechat_files")]
sudo_user = os.environ.get("SUDO_USER")
if sudo_user:
import pwd
try:
sudo_home = pwd.getpwnam(sudo_user).pw_dir
except KeyError:
sudo_home = None
if sudo_home:
fallback = os.path.join(sudo_home, "Documents", "xwechat_files")
if fallback not in search_roots:
search_roots.append(fallback)
for root in search_roots:
if not os.path.isdir(root):
continue
pattern = os.path.join(root, "*", "db_storage")
for match in glob_mod.glob(pattern):
normalized = os.path.normcase(os.path.normpath(match))
if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
candidates.append(match)
old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage")
if os.path.isdir(old_path):
normalized = os.path.normcase(os.path.normpath(old_path))
if normalized not in seen:
candidates.append(old_path)
def _mtime(path):
msg_dir = os.path.join(path, "message")
target = msg_dir if os.path.isdir(msg_dir) else path
try:
return os.path.getmtime(target)
except OSError:
return 0
candidates.sort(key=_mtime, reverse=True)
return _choose_candidate(candidates)
def _auto_detect_db_dir_macos():
base = os.path.expanduser("~/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files")
if not os.path.isdir(base):
return None
seen = set()
candidates = []
pattern = os.path.join(base, "*", "db_storage")
for match in glob_mod.glob(pattern):
normalized = os.path.normcase(os.path.normpath(match))
if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
candidates.append(match)
return _choose_candidate(candidates)
def auto_detect_db_dir():
if _SYSTEM == "windows":
return _auto_detect_db_dir_windows()
if _SYSTEM == "linux":
return _auto_detect_db_dir_linux()
if _SYSTEM == "darwin":
return _auto_detect_db_dir_macos()
return None
def load_config(config_path=None):
"""加载配置。默认从 ~/.wechat-cli/config.json 读取。"""
if config_path is None:
config_path = CONFIG_FILE
cfg = {}
if os.path.exists(config_path):
try:
with open(config_path, encoding="utf-8") as f:
cfg = json.load(f)
except json.JSONDecodeError:
cfg = {}
# db_dir 缺失时,自动检测
db_dir = cfg.get("db_dir", "")
if not db_dir:
detected = auto_detect_db_dir()
if detected:
cfg["db_dir"] = detected
else:
raise FileNotFoundError(
"未找到微信数据目录。\n"
"请运行: wechat-cli init"
)
# 设置默认值
state_dir = os.path.dirname(os.path.abspath(config_path))
cfg.setdefault("keys_file", os.path.join(state_dir, "all_keys.json"))
cfg.setdefault("decrypted_dir", os.path.join(state_dir, "decrypted"))
cfg.setdefault("decoded_image_dir", os.path.join(state_dir, "decoded_images"))
cfg.setdefault("wechat_process", _DEFAULT_PROCESS)
# 所有路径确保为绝对路径
for key in ("db_dir", "keys_file", "decrypted_dir", "decoded_image_dir"):
if key in cfg and not os.path.isabs(cfg[key]):
cfg[key] = os.path.join(state_dir, cfg[key])
# 推导微信数据根目录
db_dir = cfg.get("db_dir", "")
if db_dir and os.path.basename(db_dir) == "db_storage":
cfg["wechat_base_dir"] = os.path.dirname(db_dir)
else:
cfg["wechat_base_dir"] = db_dir
return cfg

201
wechat_cli/core/contacts.py Normal file
View File

@@ -0,0 +1,201 @@
"""联系人管理 — 加载、缓存、模糊匹配"""
import os
import re
import sqlite3
_contact_names = None # {username: display_name}
_contact_full = None # [{username, nick_name, remark}]
_self_username = None
def _load_contacts_from(db_path):
names = {}
full = []
conn = sqlite3.connect(db_path)
try:
for r in conn.execute("SELECT username, nick_name, remark FROM contact").fetchall():
uname, nick, remark = r
display = remark if remark else nick if nick else uname
names[uname] = display
full.append({'username': uname, 'nick_name': nick or '', 'remark': remark or ''})
finally:
conn.close()
return names, full
def get_contact_names(cache, decrypted_dir):
global _contact_names, _contact_full
if _contact_names is not None:
return _contact_names
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
if os.path.exists(pre_decrypted):
try:
_contact_names, _contact_full = _load_contacts_from(pre_decrypted)
return _contact_names
except Exception:
pass
path = cache.get(os.path.join("contact", "contact.db"))
if path:
try:
_contact_names, _contact_full = _load_contacts_from(path)
return _contact_names
except Exception:
pass
return {}
def get_contact_full(cache, decrypted_dir):
global _contact_full
if _contact_full is None:
get_contact_names(cache, decrypted_dir)
return _contact_full or []
def resolve_username(chat_name, cache, decrypted_dir):
names = get_contact_names(cache, decrypted_dir)
if chat_name in names or chat_name.startswith('wxid_') or '@chatroom' in chat_name:
return chat_name
chat_lower = chat_name.lower()
for uname, display in names.items():
if chat_lower == display.lower():
return uname
for uname, display in names.items():
if chat_lower in display.lower():
return uname
return None
def get_self_username(db_dir, cache, decrypted_dir):
global _self_username
if _self_username:
return _self_username
if not db_dir:
return ''
names = get_contact_names(cache, decrypted_dir)
account_dir = os.path.basename(os.path.dirname(db_dir))
candidates = [account_dir]
m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir)
if m:
candidates.insert(0, m.group(1))
for candidate in candidates:
if candidate and candidate in names:
_self_username = candidate
return _self_username
return ''
def get_group_members(chatroom_username, cache, decrypted_dir):
"""获取群聊成员列表。
通过 contact.db 的 chatroom_member 关联表查询。
Returns:
dict: {'members': [...], 'owner': str}
每个 member: {'username': ..., 'nick_name': ..., 'remark': ..., 'display_name': ...}
"""
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
if os.path.exists(pre_decrypted):
db_path = pre_decrypted
else:
db_path = cache.get(os.path.join("contact", "contact.db"))
if not db_path:
return {'members': [], 'owner': ''}
names = get_contact_names(cache, decrypted_dir)
conn = sqlite3.connect(db_path)
try:
# 1. 找到 chatroom 的 contact.id
row = conn.execute("SELECT id FROM contact WHERE username = ?", (chatroom_username,)).fetchone()
if not row:
return {'members': [], 'owner': ''}
room_id = row[0]
# 2. 获取群主
owner = ''
owner_row = conn.execute("SELECT owner FROM chat_room WHERE id = ?", (room_id,)).fetchone()
if owner_row and owner_row[0]:
owner = names.get(owner_row[0], owner_row[0])
# 3. 获取成员 ID 列表
member_ids = [r[0] for r in conn.execute(
"SELECT member_id FROM chatroom_member WHERE room_id = ?", (room_id,)
).fetchall()]
if not member_ids:
return {'members': [], 'owner': owner}
# 4. 批量查询成员信息
placeholders = ','.join('?' * len(member_ids))
members = []
for uid, username, nick, remark in conn.execute(
f"SELECT id, username, nick_name, remark FROM contact WHERE id IN ({placeholders})",
member_ids
):
display = remark if remark else nick if nick else username
members.append({
'username': username,
'nick_name': nick or '',
'remark': remark or '',
'display_name': display,
})
# 按 display_name 排序,群主排最前
members.sort(key=lambda m: (0 if m['username'] == (owner_row[0] if owner_row else '') else 1, m['display_name']))
return {'members': members, 'owner': owner}
finally:
conn.close()
def get_contact_detail(username, cache, decrypted_dir):
"""获取联系人详情。
Returns:
dict or None: 联系人详细信息
"""
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
if os.path.exists(pre_decrypted):
db_path = pre_decrypted
else:
db_path = cache.get(os.path.join("contact", "contact.db"))
if not db_path:
return None
conn = sqlite3.connect(db_path)
try:
row = conn.execute(
"SELECT username, nick_name, remark, alias, description, "
"small_head_url, big_head_url, verify_flag, local_type "
"FROM contact WHERE username = ?",
(username,)
).fetchone()
if not row:
return None
uname, nick, remark, alias, desc, small_url, big_url, verify, ltype = row
return {
'username': uname,
'nick_name': nick or '',
'remark': remark or '',
'alias': alias or '',
'description': desc or '',
'avatar': small_url or big_url or '',
'verify_flag': verify or 0,
'local_type': ltype,
'is_group': '@chatroom' in uname,
'is_subscription': uname.startswith('gh_'),
}
finally:
conn.close()
def display_name_for_username(username, names, db_dir, cache, decrypted_dir):
if not username:
return ''
if username == get_self_username(db_dir, cache, decrypted_dir):
return 'me'
return names.get(username, username)

View File

@@ -0,0 +1,41 @@
"""应用上下文 — 单例持有配置、缓存、密钥等共享状态"""
import atexit
import json
import os
from .config import load_config, STATE_DIR
from .db_cache import DBCache
from .key_utils import strip_key_metadata
from .messages import find_msg_db_keys
class AppContext:
"""每次 CLI 调用初始化一次,被所有命令共享。"""
def __init__(self, config_path=None):
self.cfg = load_config(config_path)
self.db_dir = self.cfg["db_dir"]
self.decrypted_dir = self.cfg["decrypted_dir"]
self.keys_file = self.cfg["keys_file"]
if not os.path.exists(self.keys_file):
raise FileNotFoundError(
f"密钥文件不存在: {self.keys_file}\n"
"请运行: wechat-cli init"
)
with open(self.keys_file, encoding="utf-8") as f:
self.all_keys = strip_key_metadata(json.load(f))
self.cache = DBCache(self.all_keys, self.db_dir)
atexit.register(self.cache.cleanup)
self.msg_db_keys = find_msg_db_keys(self.all_keys)
# 确保状态目录存在
os.makedirs(STATE_DIR, exist_ok=True)
def display_name_fn(self, username, names):
from .contacts import display_name_for_username
return display_name_for_username(username, names, self.db_dir, self.cache, self.decrypted_dir)

77
wechat_cli/core/crypto.py Normal file
View File

@@ -0,0 +1,77 @@
"""数据库解密 — SQLCipher 4, AES-256-CBC"""
import os
import struct
from Crypto.Cipher import AES
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
RESERVE_SZ = 80 # IV(16) + HMAC-SHA512(64)
SQLITE_HDR = b'SQLite format 3\x00'
WAL_HEADER_SZ = 32
WAL_FRAME_HEADER_SZ = 24
def decrypt_page(enc_key, page_data, pgno):
iv = page_data[PAGE_SZ - RESERVE_SZ: PAGE_SZ - RESERVE_SZ + 16]
if pgno == 1:
encrypted = page_data[SALT_SZ: PAGE_SZ - RESERVE_SZ]
cipher = AES.new(enc_key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted)
return bytes(bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ))
else:
encrypted = page_data[:PAGE_SZ - RESERVE_SZ]
cipher = AES.new(enc_key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted)
return decrypted + b'\x00' * RESERVE_SZ
def full_decrypt(db_path, out_path, enc_key):
file_size = os.path.getsize(db_path)
total_pages = file_size // PAGE_SZ
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(db_path, 'rb') as fin, open(out_path, 'wb') as fout:
for pgno in range(1, total_pages + 1):
page = fin.read(PAGE_SZ)
if len(page) < PAGE_SZ:
if len(page) > 0:
page = page + b'\x00' * (PAGE_SZ - len(page))
else:
break
fout.write(decrypt_page(enc_key, page, pgno))
return total_pages
def decrypt_wal(wal_path, out_path, enc_key):
if not os.path.exists(wal_path):
return 0
wal_size = os.path.getsize(wal_path)
if wal_size <= WAL_HEADER_SZ:
return 0
patched = 0
with open(wal_path, 'rb') as wf, open(out_path, 'r+b') as df:
wal_hdr = wf.read(WAL_HEADER_SZ)
wal_salt1 = struct.unpack('>I', wal_hdr[16:20])[0]
wal_salt2 = struct.unpack('>I', wal_hdr[20:24])[0]
frame_size = WAL_FRAME_HEADER_SZ + PAGE_SZ
while wf.tell() + frame_size <= wal_size:
fh = wf.read(WAL_FRAME_HEADER_SZ)
if len(fh) < WAL_FRAME_HEADER_SZ:
break
pgno = struct.unpack('>I', fh[0:4])[0]
frame_salt1 = struct.unpack('>I', fh[8:12])[0]
frame_salt2 = struct.unpack('>I', fh[12:16])[0]
ep = wf.read(PAGE_SZ)
if len(ep) < PAGE_SZ:
break
if pgno == 0 or pgno > 1000000:
continue
if frame_salt1 != wal_salt1 or frame_salt2 != wal_salt2:
continue
dec = decrypt_page(enc_key, ep, pgno)
df.seek((pgno - 1) * PAGE_SZ)
df.write(dec)
patched += 1
return patched

View File

@@ -0,0 +1,91 @@
"""解密数据库缓存 — mtime 检测变化,跨会话复用"""
import hashlib
import json
import os
import tempfile
from .crypto import full_decrypt, decrypt_wal
from .key_utils import get_key_info
class DBCache:
CACHE_DIR = os.path.join(tempfile.gettempdir(), "wechat_cli_cache")
MTIME_FILE = os.path.join(tempfile.gettempdir(), "wechat_cli_cache", "_mtimes.json")
def __init__(self, all_keys, db_dir):
self._all_keys = all_keys
self._db_dir = db_dir
self._cache = {} # rel_key -> (db_mtime, wal_mtime, tmp_path)
os.makedirs(self.CACHE_DIR, exist_ok=True)
self._load_persistent_cache()
def _cache_path(self, rel_key):
h = hashlib.md5(rel_key.encode()).hexdigest()[:12]
return os.path.join(self.CACHE_DIR, f"{h}.db")
def _load_persistent_cache(self):
if not os.path.exists(self.MTIME_FILE):
return
try:
with open(self.MTIME_FILE, encoding="utf-8") as f:
saved = json.load(f)
except (json.JSONDecodeError, OSError):
return
for rel_key, info in saved.items():
tmp_path = info["path"]
if not os.path.exists(tmp_path):
continue
rel_path = rel_key.replace('\\', os.sep)
db_path = os.path.join(self._db_dir, rel_path)
wal_path = db_path + "-wal"
try:
db_mtime = os.path.getmtime(db_path)
wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0
except OSError:
continue
if db_mtime == info["db_mt"] and wal_mtime == info["wal_mt"]:
self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path)
def _save_persistent_cache(self):
data = {}
for rel_key, (db_mt, wal_mt, path) in self._cache.items():
data[rel_key] = {"db_mt": db_mt, "wal_mt": wal_mt, "path": path}
try:
with open(self.MTIME_FILE, 'w', encoding="utf-8") as f:
json.dump(data, f)
except OSError:
pass
def get(self, rel_key):
key_info = get_key_info(self._all_keys, rel_key)
if not key_info:
return None
rel_path = rel_key.replace('\\', '/').replace('/', os.sep)
db_path = os.path.join(self._db_dir, rel_path)
wal_path = db_path + "-wal"
if not os.path.exists(db_path):
return None
try:
db_mtime = os.path.getmtime(db_path)
wal_mtime = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0
except OSError:
return None
if rel_key in self._cache:
c_db_mt, c_wal_mt, c_path = self._cache[rel_key]
if c_db_mt == db_mtime and c_wal_mt == wal_mtime and os.path.exists(c_path):
return c_path
tmp_path = self._cache_path(rel_key)
enc_key = bytes.fromhex(key_info["enc_key"])
full_decrypt(db_path, tmp_path, enc_key)
if os.path.exists(wal_path):
decrypt_wal(wal_path, tmp_path, enc_key)
self._cache[rel_key] = (db_mtime, wal_mtime, tmp_path)
self._save_persistent_cache()
return tmp_path
def cleanup(self):
self._save_persistent_cache()

View File

@@ -0,0 +1,36 @@
"""密钥工具 — 路径匹配、元数据剥离"""
import os
import posixpath
def strip_key_metadata(keys):
return {k: v for k, v in keys.items() if not k.startswith("_")}
def _is_safe_rel_path(path):
normalized = path.replace("\\", "/")
return ".." not in posixpath.normpath(normalized).split("/")
def key_path_variants(rel_path):
normalized = rel_path.replace("\\", "/")
variants = []
for candidate in (
rel_path,
normalized,
normalized.replace("/", "\\"),
normalized.replace("/", os.sep),
):
if candidate not in variants:
variants.append(candidate)
return variants
def get_key_info(keys, rel_path):
if not _is_safe_rel_path(rel_path):
return None
for candidate in key_path_variants(rel_path):
if candidate in keys and not candidate.startswith("_"):
return keys[candidate]
return None

665
wechat_cli/core/messages.py Normal file
View File

@@ -0,0 +1,665 @@
"""消息查询 — 分表查找、分页、格式化"""
import hashlib
import os
import re
import sqlite3
import xml.etree.ElementTree as ET
from contextlib import closing
from datetime import datetime
import zstandard as zstd
from .key_utils import key_path_variants
_zstd_dctx = zstd.ZstdDecompressor()
_XML_UNSAFE_RE = re.compile(r'<!DOCTYPE|<!ENTITY', re.IGNORECASE)
_XML_PARSE_MAX_LEN = 20000
_QUERY_LIMIT_MAX = 500
_HISTORY_QUERY_BATCH_SIZE = 500
# 消息类型过滤映射: 名称 -> (base_type,) 或 (base_type, sub_type)
MSG_TYPE_FILTERS = {
'text': (1,),
'image': (3,),
'voice': (34,),
'video': (43,),
'sticker': (47,),
'location': (48,),
'link': (49,),
'file': (49, 6),
'call': (50,),
'system': (10000,),
}
MSG_TYPE_NAMES = list(MSG_TYPE_FILTERS.keys())
# ---- 消息 DB 发现 ----
def find_msg_db_keys(all_keys):
return sorted([
k for k in all_keys
if any(v.startswith("message/") for v in key_path_variants(k))
and any(re.search(r"message_\d+\.db$", v) for v in key_path_variants(k))
])
def _is_safe_msg_table_name(table_name):
return bool(re.fullmatch(r'Msg_[0-9a-f]{32}', table_name))
def _find_msg_tables_for_user(username, msg_db_keys, cache):
table_hash = hashlib.md5(username.encode()).hexdigest()
table_name = f"Msg_{table_hash}"
if not _is_safe_msg_table_name(table_name):
return []
matches = []
for rel_key in msg_db_keys:
path = cache.get(rel_key)
if not path:
continue
conn = sqlite3.connect(path)
try:
exists = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?",
(table_name,)
).fetchone()
if not exists:
continue
max_ct = conn.execute(f"SELECT MAX(create_time) FROM [{table_name}]").fetchone()[0] or 0
matches.append({'db_path': path, 'table_name': table_name, 'max_create_time': max_ct})
except Exception:
pass
finally:
conn.close()
matches.sort(key=lambda x: x['max_create_time'], reverse=True)
return matches
# ---- 消息类型 ----
def _split_msg_type(t):
try:
t = int(t)
except (TypeError, ValueError):
return 0, 0
if t > 0xFFFFFFFF:
return t & 0xFFFFFFFF, t >> 32
return t, 0
def format_msg_type(t):
base_type, _ = _split_msg_type(t)
return {
1: '文本', 3: '图片', 34: '语音', 42: '名片',
43: '视频', 47: '表情', 48: '位置', 49: '链接/文件',
50: '通话', 10000: '系统', 10002: '撤回',
}.get(base_type, f'type={t}')
# ---- 内容解压 ----
def decompress_content(content, ct):
if ct and ct == 4 and isinstance(content, bytes):
try:
return _zstd_dctx.decompress(content).decode('utf-8', errors='replace')
except Exception:
return None
if isinstance(content, bytes):
try:
return content.decode('utf-8', errors='replace')
except Exception:
return None
return content
# ---- 内容解析 ----
def _parse_message_content(content, local_type, is_group):
if content is None:
return '', ''
if isinstance(content, bytes):
return '', '(二进制内容)'
sender = ''
text = content
if is_group and ':\n' in content:
sender, text = content.split(':\n', 1)
return sender, text
def _collapse_text(text):
if not text:
return ''
return re.sub(r'\s+', ' ', text).strip()
def _parse_xml_root(content):
if not content or len(content) > _XML_PARSE_MAX_LEN or _XML_UNSAFE_RE.search(content):
return None
try:
return ET.fromstring(content)
except ET.ParseError:
return None
def _parse_int(value, fallback=0):
try:
return int(value)
except (TypeError, ValueError):
return fallback
def _format_app_message_text(content, local_type, is_group, chat_username, chat_display_name, names, _display_name_fn):
if not content or '<appmsg' not in content:
return None
_, sub_type = _split_msg_type(local_type)
root = _parse_xml_root(content)
if root is None:
return None
appmsg = root.find('.//appmsg')
if appmsg is None:
return None
title = _collapse_text(appmsg.findtext('title') or '')
app_type = _parse_int((appmsg.findtext('type') or '').strip(), _parse_int(sub_type, 0))
if app_type == 57:
ref = appmsg.find('.//refermsg')
ref_content = ''
ref_display_name = ''
if ref is not None:
ref_display_name = (ref.findtext('displayname') or '').strip()
ref_content = _collapse_text(ref.findtext('content') or '')
if len(ref_content) > 160:
ref_content = ref_content[:160] + "..."
quote_text = title or "[引用消息]"
if ref_content:
prefix = f"回复 {ref_display_name}: " if ref_display_name else "回复: "
quote_text += f"\n{prefix}{ref_content}"
return quote_text
if app_type == 6:
return f"[文件] {title}" if title else "[文件]"
if app_type == 5:
return f"[链接] {title}" if title else "[链接]"
if app_type in (33, 36, 44):
return f"[小程序] {title}" if title else "[小程序]"
if title:
return f"[链接/文件] {title}"
return "[链接/文件]"
def _format_voip_message_text(content):
if not content or '<voip' not in content:
return None
root = _parse_xml_root(content)
if root is None:
return "[通话]"
raw_text = _collapse_text(root.findtext('.//msg') or '')
if not raw_text:
return "[通话]"
status_map = {
'Canceled': '已取消', 'Line busy': '对方忙线',
'Call not answered': '未接听', "Call wasn't answered": '未接听',
}
if raw_text.startswith('Duration:'):
duration = raw_text.split(':', 1)[1].strip()
return f"[通话] 通话时长 {duration}" if duration else "[通话]"
return f"[通话] {status_map.get(raw_text, raw_text)}"
def _format_message_text(local_id, local_type, content, is_group, chat_username, chat_display_name, names, display_name_fn):
sender, text = _parse_message_content(content, local_type, is_group)
base_type, _ = _split_msg_type(local_type)
if base_type == 3:
text = f"[图片] (local_id={local_id})"
elif base_type == 47:
text = "[表情]"
elif base_type == 50:
text = _format_voip_message_text(text) or "[通话]"
elif base_type == 49:
text = _format_app_message_text(
text, local_type, is_group, chat_username, chat_display_name, names, display_name_fn
) or "[链接/文件]"
elif base_type != 1:
type_label = format_msg_type(local_type)
text = f"[{type_label}] {text}" if text else f"[{type_label}]"
return sender, text
# ---- Name2Id ----
def _load_name2id_maps(conn):
id_to_username = {}
try:
rows = conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall()
except sqlite3.Error:
return id_to_username
for rowid, user_name in rows:
if not user_name:
continue
id_to_username[rowid] = user_name
return id_to_username
# ---- 发送者解析 ----
def _resolve_sender_label(real_sender_id, sender_from_content, is_group, chat_username, chat_display_name, names, id_to_username, display_name_fn):
sender_username = id_to_username.get(real_sender_id, '')
if is_group:
if sender_username and sender_username != chat_username:
return display_name_fn(sender_username, names)
if sender_from_content:
return display_name_fn(sender_from_content, names)
return ''
if sender_username == chat_username:
return chat_display_name
if sender_username:
return display_name_fn(sender_username, names)
return ''
# ---- SQL 查询 ----
def _build_message_filters(start_ts=None, end_ts=None, keyword='', msg_type_filter=None):
clauses = []
params = []
if start_ts is not None:
clauses.append('create_time >= ?')
params.append(start_ts)
if end_ts is not None:
clauses.append('create_time <= ?')
params.append(end_ts)
if keyword:
clauses.append('message_content LIKE ?')
params.append(f'%{keyword}%')
if msg_type_filter is not None:
base_type = msg_type_filter[0]
clauses.append('(local_type & 0xFFFFFFFF) = ?')
params.append(base_type)
if len(msg_type_filter) > 1:
clauses.append('((local_type >> 32) & 0xFFFFFFFF) = ?')
params.append(msg_type_filter[1])
return clauses, params
def _query_messages(conn, table_name, start_ts=None, end_ts=None, keyword='', limit=20, offset=0, msg_type_filter=None):
if not _is_safe_msg_table_name(table_name):
raise ValueError(f'非法消息表名: {table_name}')
clauses, params = _build_message_filters(start_ts, end_ts, keyword, msg_type_filter)
where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else ''
sql = f"""
SELECT local_id, local_type, create_time, real_sender_id, message_content,
WCDB_CT_message_content
FROM [{table_name}]
{where_sql}
ORDER BY create_time DESC
"""
if limit is None:
return conn.execute(sql, params).fetchall()
sql += "\n LIMIT ? OFFSET ?"
return conn.execute(sql, (*params, limit, offset)).fetchall()
# ---- 时间解析 ----
def parse_time_value(value, field_name, is_end=False):
value = (value or '').strip()
if not value:
return None
formats = [
('%Y-%m-%d %H:%M:%S', False),
('%Y-%m-%d %H:%M', False),
('%Y-%m-%d', True),
]
for fmt, date_only in formats:
try:
dt = datetime.strptime(value, fmt)
if date_only and is_end:
dt = dt.replace(hour=23, minute=59, second=59)
return int(dt.timestamp())
except ValueError:
continue
raise ValueError(f"{field_name} 格式无效: {value}。支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS")
def parse_time_range(start_time='', end_time=''):
start_ts = parse_time_value(start_time, 'start_time', is_end=False)
end_ts = parse_time_value(end_time, 'end_time', is_end=True)
if start_ts is not None and end_ts is not None and start_ts > end_ts:
raise ValueError('start_time 不能晚于 end_time')
return start_ts, end_ts
def validate_pagination(limit, offset=0, limit_max=_QUERY_LIMIT_MAX):
if limit <= 0:
raise ValueError("limit 必须大于 0")
if limit_max is not None and limit > limit_max:
raise ValueError(f"limit 不能大于 {limit_max}")
if offset < 0:
raise ValueError("offset 不能小于 0")
# ---- 聊天上下文 ----
def resolve_chat_context(chat_name, msg_db_keys, cache, decrypted_dir):
from .contacts import resolve_username, get_contact_names
username = resolve_username(chat_name, cache, decrypted_dir)
if not username:
return None
names = get_contact_names(cache, decrypted_dir)
display_name = names.get(username, username)
message_tables = _find_msg_tables_for_user(username, msg_db_keys, cache)
if not message_tables:
return {
'query': chat_name, 'username': username, 'display_name': display_name,
'db_path': None, 'table_name': None, 'message_tables': [],
'is_group': '@chatroom' in username,
}
primary = message_tables[0]
return {
'query': chat_name, 'username': username, 'display_name': display_name,
'db_path': primary['db_path'], 'table_name': primary['table_name'],
'message_tables': message_tables, 'is_group': '@chatroom' in username,
}
def _iter_table_contexts(ctx):
tables = ctx.get('message_tables') or []
if not tables and ctx.get('db_path') and ctx.get('table_name'):
tables = [{'db_path': ctx['db_path'], 'table_name': ctx['table_name']}]
for table in tables:
yield {
'query': ctx['query'], 'username': ctx['username'], 'display_name': ctx['display_name'],
'db_path': table['db_path'], 'table_name': table['table_name'],
'is_group': ctx['is_group'],
}
def _candidate_page_size(limit, offset):
return limit + offset
def _page_ranked_entries(entries, limit, offset):
ordered = sorted(entries, key=lambda item: item[0], reverse=True)
paged = ordered[offset:offset + limit]
paged.sort(key=lambda item: item[0])
return paged
# ---- 构建行 ----
def _build_history_line(row, ctx, names, id_to_username, display_name_fn):
local_id, local_type, create_time, real_sender_id, content, ct = row
time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M')
content = decompress_content(content, ct)
if content is None:
content = '(无法解压)'
sender, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn
)
sender_label = _resolve_sender_label(
real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn
)
if sender_label:
return create_time, f'[{time_str}] {sender_label}: {text}'
return create_time, f'[{time_str}] {text}'
def _build_search_entry(row, ctx, names, id_to_username, display_name_fn):
local_id, local_type, create_time, real_sender_id, content, ct = row
content = decompress_content(content, ct)
if content is None:
return None
sender, text = _format_message_text(
local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names, display_name_fn
)
if text and len(text) > 300:
text = text[:300] + '...'
sender_label = _resolve_sender_label(
real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username, display_name_fn
)
time_str = datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M')
entry = f"[{time_str}] [{ctx['display_name']}]"
if sender_label:
entry += f" {sender_label}:"
entry += f" {text}"
return create_time, entry
# ---- 聊天记录查询 ----
def collect_chat_history(ctx, names, display_name_fn, start_ts=None, end_ts=None, limit=20, offset=0, msg_type_filter=None):
collected = []
failures = []
candidate_limit = _candidate_page_size(limit, offset)
batch_size = min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE)
for table_ctx in _iter_table_contexts(ctx):
try:
with closing(sqlite3.connect(table_ctx['db_path'])) as conn:
id_to_username = _load_name2id_maps(conn)
fetch_offset = 0
before = len(collected)
while len(collected) - before < candidate_limit:
rows = _query_messages(conn, table_ctx['table_name'], start_ts=start_ts, end_ts=end_ts, limit=batch_size, offset=fetch_offset, msg_type_filter=msg_type_filter)
if not rows:
break
fetch_offset += len(rows)
for row in rows:
try:
collected.append(_build_history_line(row, table_ctx, names, id_to_username, display_name_fn))
except Exception as e:
failures.append(f"local_id={row[0]}: {e}")
if len(collected) - before >= candidate_limit:
break
if len(rows) < batch_size:
break
except Exception as e:
failures.append(f"{table_ctx['db_path']}: {e}")
paged = _page_ranked_entries(collected, limit, offset)
return [line for _, line in paged], failures
# ---- 搜索查询 ----
def _collect_search_entries(conn, contexts, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None):
collected = []
failures = []
id_to_username = _load_name2id_maps(conn)
batch_size = candidate_limit
for ctx in contexts:
try:
fetch_offset = 0
before = len(collected)
while len(collected) - before < candidate_limit:
rows = _query_messages(conn, ctx['table_name'], start_ts=start_ts, end_ts=end_ts, keyword=keyword, limit=batch_size, offset=fetch_offset, msg_type_filter=msg_type_filter)
if not rows:
break
fetch_offset += len(rows)
for row in rows:
formatted = _build_search_entry(row, ctx, names, id_to_username, display_name_fn)
if formatted:
collected.append(formatted)
if len(collected) - before >= candidate_limit:
break
if len(rows) < batch_size:
break
except Exception as e:
failures.append(f"{ctx['display_name']}: {e}")
return collected, failures
def collect_chat_search(ctx, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None):
collected = []
failures = []
contexts_by_db = {}
for table_ctx in _iter_table_contexts(ctx):
contexts_by_db.setdefault(table_ctx['db_path'], []).append(table_ctx)
for db_path, db_contexts in contexts_by_db.items():
try:
with closing(sqlite3.connect(db_path)) as conn:
db_entries, db_failures = _collect_search_entries(
conn, db_contexts, names, keyword, display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=msg_type_filter,
)
collected.extend(db_entries)
failures.extend(db_failures)
except Exception as e:
failures.extend(f"{tc['display_name']}: {e}" for tc in db_contexts)
return collected, failures
def search_all_messages(msg_db_keys, cache, names, keyword, display_name_fn, start_ts=None, end_ts=None, candidate_limit=20, msg_type_filter=None):
collected = []
failures = []
for rel_key in msg_db_keys:
path = cache.get(rel_key)
if not path:
continue
try:
with closing(sqlite3.connect(path)) as conn:
contexts = _load_search_contexts_from_db(conn, path, names)
db_entries, db_failures = _collect_search_entries(
conn, contexts, names, keyword, display_name_fn,
start_ts=start_ts, end_ts=end_ts, candidate_limit=candidate_limit,
msg_type_filter=msg_type_filter,
)
collected.extend(db_entries)
failures.extend(db_failures)
except Exception as e:
failures.append(f"{rel_key}: {e}")
return collected, failures
def _load_search_contexts_from_db(conn, db_path, names):
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'"
).fetchall()
table_to_username = {}
try:
for (user_name,) in conn.execute("SELECT user_name FROM Name2Id").fetchall():
if not user_name:
continue
table_hash = hashlib.md5(user_name.encode()).hexdigest()
table_to_username[f"Msg_{table_hash}"] = user_name
except sqlite3.Error:
pass
contexts = []
for (table_name,) in tables:
username = table_to_username.get(table_name, '')
display_name = names.get(username, username) if username else table_name
contexts.append({
'query': display_name, 'username': username, 'display_name': display_name,
'db_path': db_path, 'table_name': table_name, 'is_group': '@chatroom' in username,
})
return contexts
# ---- 多聊天上下文解析 ----
def resolve_chat_contexts(chat_names, msg_db_keys, cache, decrypted_dir):
resolved = []
unresolved = []
missing_tables = []
seen = set()
for chat_name in chat_names:
name = (chat_name or '').strip()
if not name:
unresolved.append('(空)')
continue
ctx = resolve_chat_context(name, msg_db_keys, cache, decrypted_dir)
if not ctx:
unresolved.append(name)
continue
if not ctx['message_tables']:
missing_tables.append(ctx['display_name'])
continue
if ctx['username'] in seen:
continue
seen.add(ctx['username'])
resolved.append(ctx)
return resolved, unresolved, missing_tables
# ---- 聊天统计 ----
def collect_chat_stats(ctx, names, display_name_fn, start_ts=None, end_ts=None):
"""聚合统计指定聊天的消息数据。
返回: {
total, type_breakdown: {type_name: count},
top_senders: [{name, count}],
hourly: {0:N, ..., 23:N}
}
"""
type_map = {
1: '文本', 3: '图片', 34: '语音', 42: '名片',
43: '视频', 47: '表情', 48: '位置', 49: '链接/文件',
50: '通话', 10000: '系统', 10002: '撤回',
}
total = 0
type_counts = {}
sender_counts = {}
hourly_counts = {}
for table_ctx in _iter_table_contexts(ctx):
try:
with closing(sqlite3.connect(table_ctx['db_path'])) as conn:
id_to_username = _load_name2id_maps(conn)
tbl = table_ctx['table_name']
if not _is_safe_msg_table_name(tbl):
continue
where_parts = []
params = []
if start_ts is not None:
where_parts.append('create_time >= ?')
params.append(start_ts)
if end_ts is not None:
where_parts.append('create_time <= ?')
params.append(end_ts)
where_sql = f"WHERE {' AND '.join(where_parts)}" if where_parts else ''
# 总数 + 类型分布
for bt, cnt in conn.execute(
f"SELECT (local_type & 0xFFFFFFFF), COUNT(*) FROM [{tbl}] {where_sql} GROUP BY (local_type & 0xFFFFFFFF)",
params
).fetchall():
label = type_map.get(bt, f'type={bt}')
type_counts[label] = type_counts.get(label, 0) + cnt
total += cnt
# 发送者排名
for sid, cnt in conn.execute(
f"SELECT real_sender_id, COUNT(*) FROM [{tbl}] {where_sql} GROUP BY real_sender_id ORDER BY COUNT(*) DESC LIMIT 20",
params
).fetchall():
uname = id_to_username.get(sid, str(sid))
if uname:
sender_counts[uname] = sender_counts.get(uname, 0) + cnt
# 24小时分布
for h, cnt in conn.execute(
f"SELECT cast(strftime('%H', create_time, 'unixepoch', 'localtime') as integer), COUNT(*) FROM [{tbl}] {where_sql} GROUP BY cast(strftime('%H', create_time, 'unixepoch', 'localtime') as integer)",
params
).fetchall():
if h is not None:
hourly_counts[h] = hourly_counts.get(h, 0) + cnt
except Exception:
pass
top_senders = sorted(sender_counts.items(), key=lambda x: x[1], reverse=True)[:10]
top_senders = [{'name': display_name_fn(u, names), 'count': c} for u, c in top_senders]
hourly = {h: hourly_counts.get(h, 0) for h in range(24)}
return {
'total': total,
'type_breakdown': dict(sorted(type_counts.items(), key=lambda x: x[1], reverse=True)),
'top_senders': top_senders,
'hourly': hourly,
}

View File

@@ -0,0 +1,31 @@
"""密钥提取模块 — 根据平台调用对应的 scanner"""
import platform
def extract_keys(db_dir, output_path, pid=None):
"""提取微信数据库密钥并保存到 output_path。
Args:
db_dir: 微信数据库目录db_storage
output_path: all_keys.json 输出路径
pid: 可选,指定微信进程 PID默认自动检测
Returns:
dict: salt_hex -> enc_key_hex 的映射
Raises:
RuntimeError: 提取失败
"""
system = platform.system().lower()
if system == "darwin":
from .scanner_macos import extract_keys as _extract
return _extract(db_dir, output_path, pid=pid)
elif system == "windows":
from .scanner_windows import extract_keys as _extract
return _extract(db_dir, output_path, pid=pid)
elif system == "linux":
from .scanner_linux import extract_keys as _extract
return _extract(db_dir, output_path, pid=pid)
else:
raise RuntimeError(f"不支持的平台: {platform.system()}")

184
wechat_cli/keys/common.py Normal file
View File

@@ -0,0 +1,184 @@
"""
跨平台共享的内存扫描逻辑HMAC 验证、DB 收集、hex 模式匹配与结果输出。
从 wechat-decrypt/key_scan_common.py 适配save_results 改为返回 dict。
"""
import hashlib
import hmac as hmac_mod
import json
import os
import re
import struct
PAGE_SZ = 4096
KEY_SZ = 32
SALT_SZ = 16
def verify_enc_key(enc_key, db_page1):
"""通过 HMAC-SHA512 校验 page 1 验证 enc_key 是否正确。"""
salt = db_page1[:SALT_SZ]
mac_salt = bytes(b ^ 0x3A for b in salt)
mac_key = hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SZ)
hmac_data = db_page1[SALT_SZ: PAGE_SZ - 80 + 16]
stored_hmac = db_page1[PAGE_SZ - 64: PAGE_SZ]
hm = hmac_mod.new(mac_key, hmac_data, hashlib.sha512)
hm.update(struct.pack("<I", 1))
return hm.digest() == stored_hmac
def collect_db_files(db_dir):
"""遍历 db_dir 收集所有 .db 文件及其 salt。
返回 (db_files, salt_to_dbs):
db_files: [(rel_path, abs_path, size, salt_hex, page1_bytes), ...]
salt_to_dbs: {salt_hex: [rel_path, ...]}
"""
db_files = []
salt_to_dbs = {}
for root, dirs, files in os.walk(db_dir):
for name in files:
if not name.endswith(".db") or name.endswith("-wal") or name.endswith("-shm"):
continue
path = os.path.join(root, name)
size = os.path.getsize(path)
if size < PAGE_SZ:
continue
with open(path, "rb") as f:
page1 = f.read(PAGE_SZ)
rel = os.path.relpath(path, db_dir)
salt = page1[:SALT_SZ].hex()
db_files.append((rel, path, size, salt, page1))
salt_to_dbs.setdefault(salt, []).append(rel)
return db_files, salt_to_dbs
def scan_memory_for_keys(data, hex_re, db_files, salt_to_dbs, key_map,
remaining_salts, base_addr, pid, print_fn):
"""扫描一段内存数据,匹配 hex 模式并验证密钥。
返回本次扫描匹配到的 hex 模式数量。
"""
matches = 0
for m in hex_re.finditer(data):
hex_str = m.group(1).decode()
addr = base_addr + m.start()
matches += 1
hex_len = len(hex_str)
if hex_len == 96:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[64:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print_fn(f"\n [FOUND] salt={salt_hex}")
print_fn(f" enc_key={enc_key_hex}")
print_fn(f" PID={pid} 地址: 0x{addr:016X}")
print_fn(f" 数据库: {', '.join(dbs)}")
break
elif hex_len == 64:
if not remaining_salts:
continue
enc_key_hex = hex_str
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, salt_hex_db, page1 in db_files:
if salt_hex_db in remaining_salts and verify_enc_key(enc_key, page1):
key_map[salt_hex_db] = enc_key_hex
remaining_salts.discard(salt_hex_db)
dbs = salt_to_dbs[salt_hex_db]
print_fn(f"\n [FOUND] salt={salt_hex_db}")
print_fn(f" enc_key={enc_key_hex}")
print_fn(f" PID={pid} 地址: 0x{addr:016X}")
print_fn(f" 数据库: {', '.join(dbs)}")
break
elif hex_len > 96 and hex_len % 2 == 0:
enc_key_hex = hex_str[:64]
salt_hex = hex_str[-32:]
if salt_hex in remaining_salts:
enc_key = bytes.fromhex(enc_key_hex)
for rel, path, sz, s, page1 in db_files:
if s == salt_hex and verify_enc_key(enc_key, page1):
key_map[salt_hex] = enc_key_hex
remaining_salts.discard(salt_hex)
dbs = salt_to_dbs[salt_hex]
print_fn(f"\n [FOUND] salt={salt_hex} (long hex {hex_len})")
print_fn(f" enc_key={enc_key_hex}")
print_fn(f" PID={pid} 地址: 0x{addr:016X}")
print_fn(f" 数据库: {', '.join(dbs)}")
break
return matches
def cross_verify_keys(db_files, salt_to_dbs, key_map, print_fn):
"""用已找到的 key 交叉验证未匹配的 salt。"""
missing_salts = set(salt_to_dbs.keys()) - set(key_map.keys())
if not missing_salts or not key_map:
return
print_fn(f"\n还有 {len(missing_salts)} 个 salt 未匹配,尝试交叉验证...")
for salt_hex in list(missing_salts):
for rel, path, sz, s, page1 in db_files:
if s == salt_hex:
for known_salt, known_key_hex in key_map.items():
enc_key = bytes.fromhex(known_key_hex)
if verify_enc_key(enc_key, page1):
key_map[salt_hex] = known_key_hex
print_fn(f" [CROSS] salt={salt_hex} 可用 key from salt={known_salt}")
missing_salts.discard(salt_hex)
break
def save_results(db_files, salt_to_dbs, key_map, output_path, print_fn):
"""保存密钥结果到 JSON 文件。
Args:
db_files: collect_db_files 返回的 db_files
salt_to_dbs: collect_db_files 返回的 salt_to_dbs
key_map: {salt_hex: enc_key_hex}
output_path: 输出 JSON 文件路径
print_fn: 日志输出函数
Returns:
dict: salt_hex -> enc_key_hex 映射
Raises:
RuntimeError: 未提取到任何密钥
"""
print_fn(f"\n{'=' * 60}")
print_fn(f"结果: {len(key_map)}/{len(salt_to_dbs)} salts 找到密钥")
result = {}
for rel, path, sz, salt_hex, page1 in db_files:
if salt_hex in key_map:
result[rel] = {
"enc_key": key_map[salt_hex],
"salt": salt_hex,
"size_mb": round(sz / 1024 / 1024, 1)
}
print_fn(f" OK: {rel} ({sz / 1024 / 1024:.1f}MB)")
else:
print_fn(f" MISSING: {rel} (salt={salt_hex})")
if not result:
print_fn(f"\n[!] 未提取到任何密钥")
raise RuntimeError("未能从任何微信进程中提取到密钥")
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print_fn(f"\n密钥保存到: {output_path}")
missing = [rel for rel, path, sz, salt_hex, page1 in db_files if salt_hex not in key_map]
if missing:
print_fn(f"\n未找到密钥的数据库:")
for rel in missing:
print_fn(f" {rel}")
return key_map

View File

@@ -0,0 +1,218 @@
"""Linux 密钥提取 — 通过 /proc 读取微信进程内存"""
import functools
import os
import re
import sys
import time
from .common import collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results
print = functools.partial(print, flush=True)
def _safe_readlink(path):
try:
return os.path.realpath(os.readlink(path))
except OSError:
return ""
_KNOWN_COMMS = {"wechat", "wechatappex", "weixin"}
_INTERPRETER_PREFIXES = ("python", "bash", "sh", "zsh", "node", "perl", "ruby")
def _is_wechat_process(pid):
"""检查 pid 是否为微信进程。"""
if pid == os.getpid():
return False
try:
with open(f"/proc/{pid}/comm") as f:
comm = f.read().strip()
if comm.lower() in _KNOWN_COMMS:
return True
exe_path = _safe_readlink(f"/proc/{pid}/exe")
exe_name = os.path.basename(exe_path)
if any(exe_name.lower().startswith(p) for p in _INTERPRETER_PREFIXES):
return False
return "wechat" in exe_name.lower() or "weixin" in exe_name.lower()
except (PermissionError, FileNotFoundError, ProcessLookupError):
return False
def _get_pids():
"""返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。"""
pids = []
for pid_str in os.listdir("/proc"):
if not pid_str.isdigit():
continue
pid = int(pid_str)
try:
if not _is_wechat_process(pid):
continue
with open(f"/proc/{pid}/statm") as f:
rss_pages = int(f.read().split()[1])
rss_kb = rss_pages * 4
pids.append((pid, rss_kb))
except (PermissionError, FileNotFoundError, ProcessLookupError):
continue
if not pids:
raise RuntimeError("未检测到 Linux 微信进程")
pids.sort(key=lambda item: item[1], reverse=True)
for pid, rss_kb in pids:
exe_path = _safe_readlink(f"/proc/{pid}/exe")
print(f"[+] WeChat PID={pid} ({rss_kb // 1024}MB) {exe_path}")
return pids
_SKIP_MAPPINGS = {"[vdso]", "[vsyscall]", "[vvar]"}
_SKIP_PATH_PREFIXES = ("/usr/lib/", "/lib/", "/usr/share/")
def _get_readable_regions(pid):
"""解析 /proc/<pid>/maps返回可读内存区域列表。"""
regions = []
with open(f"/proc/{pid}/maps") as f:
for line in f:
parts = line.split()
if len(parts) < 2:
continue
if "r" not in parts[1]:
continue
if len(parts) >= 6:
mapping_name = parts[5]
if mapping_name in _SKIP_MAPPINGS:
continue
mapping_lower = mapping_name.lower()
if (any(mapping_name.startswith(p) for p in _SKIP_PATH_PREFIXES)
and "wcdb" not in mapping_lower
and "wechat" not in mapping_lower
and "weixin" not in mapping_lower):
continue
start_s, end_s = parts[0].split("-")
start = int(start_s, 16)
size = int(end_s, 16) - start
if 0 < size < 500 * 1024 * 1024:
regions.append((start, size))
return regions
def _check_permissions():
"""检查是否有读取进程内存的权限。"""
if os.geteuid() == 0:
return
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("CapEff:"):
cap_eff = int(line.split(":")[1].strip(), 16)
CAP_SYS_PTRACE = 1 << 19
if cap_eff & CAP_SYS_PTRACE:
return
break
except (OSError, ValueError):
pass
raise RuntimeError(
"需要 root 权限或 CAP_SYS_PTRACE 才能读取进程内存\n"
"请使用: sudo wechat-cli init\n"
"或授予 capability: sudo setcap cap_sys_ptrace=ep $(which python3)"
)
def extract_keys(db_dir, output_path, pid=None):
"""提取 Linux 微信数据库密钥。
Args:
db_dir: 微信数据库目录
output_path: all_keys.json 输出路径
pid: 可选,指定 PID默认自动检测
Returns:
dict: salt_hex -> enc_key_hex 映射
"""
_check_permissions()
print("=" * 60)
print(" 提取 Linux 微信数据库密钥(内存扫描)")
print("=" * 60)
db_files, salt_to_dbs = collect_db_files(db_dir)
if not db_files:
raise RuntimeError(f"{db_dir} 未找到可解密的 .db 文件")
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的 salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
pids = _get_pids() if pid is None else [(pid, 0)]
hex_re = re.compile(rb"x'([0-9a-fA-F]{64,192})'")
key_map = {}
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for pid_val, rss_kb in pids:
try:
regions = _get_readable_regions(pid_val)
except PermissionError:
print(f"[WARN] 无法读取 /proc/{pid_val}/maps权限不足跳过")
continue
except (FileNotFoundError, ProcessLookupError):
print(f"[WARN] PID {pid_val} 已退出,跳过")
continue
total_bytes = sum(s for _, s in regions)
total_mb = total_bytes / 1024 / 1024
print(f"\n[*] 扫描 PID={pid_val} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
try:
mem = open(f"/proc/{pid_val}/mem", "rb")
except PermissionError:
print(f"[WARN] 无法打开 /proc/{pid_val}/mem权限不足跳过")
continue
except (FileNotFoundError, ProcessLookupError):
print(f"[WARN] PID {pid_val} 已退出,跳过")
continue
if not _is_wechat_process(pid_val):
print(f"[WARN] PID {pid_val} 已不是微信进程,跳过")
mem.close()
continue
try:
for reg_idx, (base, size) in enumerate(regions):
try:
mem.seek(base)
data = mem.read(size)
except (OSError, ValueError):
continue
scanned_bytes += len(data)
all_hex_matches += scan_memory_for_keys(
data, hex_re, db_files, salt_to_dbs,
key_map, remaining_salts, base, pid_val, print,
)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
)
finally:
mem.close()
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式")
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
return save_results(db_files, salt_to_dbs, key_map, output_path, print)

View File

@@ -0,0 +1,114 @@
"""macOS 密钥提取 — 通过 C 二进制扫描微信进程内存"""
import os
import platform
import subprocess
import sys
from .common import collect_db_files, cross_verify_keys, save_results, scan_memory_for_keys
def _find_binary():
"""查找对应架构的 C 二进制。"""
machine = platform.machine()
if machine == "arm64":
name = "find_all_keys_macos.arm64"
elif machine == "x86_64":
name = "find_all_keys_macos.x86_64"
else:
raise RuntimeError(f"不支持的 macOS 架构: {machine}")
# 优先查找 bin/ 目录pip 安装后位于包内)
pkg_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
bin_path = os.path.join(pkg_dir, "bin", name)
if os.path.isfile(bin_path):
return bin_path
raise RuntimeError(
f"找不到密钥提取二进制: {bin_path}\n"
"请确认安装包完整"
)
def extract_keys(db_dir, output_path, pid=None):
"""通过 C 二进制提取 macOS 微信数据库密钥。
C 二进制需要在微信数据目录的父目录下运行,
因为它会自动检测 db_storage 子目录。
输出 all_keys.json 到当前工作目录。
Args:
db_dir: 微信 db_storage 目录
output_path: all_keys.json 输出路径
pid: 未使用C 二进制自动检测进程)
Returns:
dict: salt_hex -> enc_key_hex 映射
"""
import re
import json
binary = _find_binary()
# C 二进制的工作目录需要是 db_storage 的父目录
work_dir = os.path.dirname(db_dir)
if not os.path.isdir(work_dir):
raise RuntimeError(f"微信数据目录不存在: {work_dir}")
print(f"[+] 使用 C 二进制提取密钥: {binary}")
print(f"[+] 工作目录: {work_dir}")
try:
result = subprocess.run(
[binary],
cwd=work_dir,
capture_output=True,
text=True,
timeout=120,
)
except subprocess.TimeoutExpired:
raise RuntimeError("密钥提取超时120s")
except PermissionError:
raise RuntimeError(
f"无法执行 {binary}\n"
"请确保文件有执行权限: chmod +x " + binary
)
# 打印 C 二进制的输出
if result.stdout:
print(result.stdout)
if result.stderr:
print(result.stderr, file=sys.stderr)
# C 二进制输出 all_keys.json 到 work_dir
c_output = os.path.join(work_dir, "all_keys.json")
if not os.path.exists(c_output):
if "task_for_pid" in (result.stdout or "") + (result.stderr or ""):
raise RuntimeError(
"需要 root 权限才能读取微信进程内存。\n"
"请使用: sudo wechat-cli init"
)
raise RuntimeError(
"C 二进制未能生成密钥文件。\n"
f"stdout: {result.stdout}\nstderr: {result.stderr}"
)
# 读取并转存到 output_path
with open(c_output, encoding="utf-8") as f:
keys_data = json.load(f)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(keys_data, f, indent=2, ensure_ascii=False)
# 清理 C 二进制的临时输出
if os.path.abspath(c_output) != os.path.abspath(output_path):
os.remove(c_output)
# 构建 salt -> key 映射
key_map = {}
for rel, info in keys_data.items():
if isinstance(info, dict) and "enc_key" in info and "salt" in info:
key_map[info["salt"]] = info["enc_key"]
print(f"\n[+] 提取到 {len(key_map)} 个密钥,保存到: {output_path}")
return key_map

View File

@@ -0,0 +1,145 @@
"""Windows 密钥提取 — 扫描 Weixin.exe 进程内存"""
import ctypes
import ctypes.wintypes as wt
import functools
import os
import re
import subprocess
import time
from .common import collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results
print = functools.partial(print, flush=True)
kernel32 = ctypes.windll.kernel32
MEM_COMMIT = 0x1000
READABLE = {0x02, 0x04, 0x08, 0x10, 0x20, 0x40, 0x80}
class MBI(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_uint64), ("AllocationBase", ctypes.c_uint64),
("AllocationProtect", wt.DWORD), ("_pad1", wt.DWORD),
("RegionSize", ctypes.c_uint64), ("State", wt.DWORD),
("Protect", wt.DWORD), ("Type", wt.DWORD), ("_pad2", wt.DWORD),
]
def _get_pids():
"""返回所有 Weixin.exe 进程的 (pid, mem_kb) 列表,按内存降序"""
r = subprocess.run(["tasklist", "/FI", "IMAGENAME eq Weixin.exe", "/FO", "CSV", "/NH"],
capture_output=True, text=True)
pids = []
for line in r.stdout.strip().split('\n'):
if not line.strip():
continue
p = line.strip('"').split('","')
if len(p) >= 5:
pid = int(p[1])
mem = int(p[4].replace(',', '').replace(' K', '').strip() or '0')
pids.append((pid, mem))
if not pids:
raise RuntimeError("Weixin.exe 未运行")
pids.sort(key=lambda x: x[1], reverse=True)
for pid, mem in pids:
print(f"[+] Weixin.exe PID={pid} ({mem // 1024}MB)")
return pids
def _read_mem(h, addr, sz):
buf = ctypes.create_string_buffer(sz)
n = ctypes.c_size_t(0)
if kernel32.ReadProcessMemory(h, ctypes.c_uint64(addr), buf, sz, ctypes.byref(n)):
return buf.raw[:n.value]
return None
def _enum_regions(h):
regs = []
addr = 0
mbi = MBI()
while addr < 0x7FFFFFFFFFFF:
if kernel32.VirtualQueryEx(h, ctypes.c_uint64(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)) == 0:
break
if mbi.State == MEM_COMMIT and mbi.Protect in READABLE and 0 < mbi.RegionSize < 500 * 1024 * 1024:
regs.append((mbi.BaseAddress, mbi.RegionSize))
nxt = mbi.BaseAddress + mbi.RegionSize
if nxt <= addr:
break
addr = nxt
return regs
def extract_keys(db_dir, output_path, pid=None):
"""提取 Windows 微信数据库密钥。
Args:
db_dir: 微信数据库目录
output_path: all_keys.json 输出路径
pid: 可选,指定 PID默认自动检测所有 Weixin.exe
Returns:
dict: salt_hex -> enc_key_hex 映射
"""
print("=" * 60)
print(" 提取所有微信数据库密钥")
print("=" * 60)
db_files, salt_to_dbs = collect_db_files(db_dir)
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
pids = _get_pids() if pid is None else [(pid, 0)]
hex_re = re.compile(b"x'([0-9a-fA-F]{64,192})'")
key_map = {}
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for pid_val, mem_kb in pids:
h = kernel32.OpenProcess(0x0010 | 0x0400, False, pid_val)
if not h:
print(f"[WARN] 无法打开进程 PID={pid_val},跳过")
continue
try:
regions = _enum_regions(h)
total_bytes = sum(s for _, s in regions)
total_mb = total_bytes / 1024 / 1024
print(f"\n[*] 扫描 PID={pid_val} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
for reg_idx, (base, size) in enumerate(regions):
data = _read_mem(h, base, size)
scanned_bytes += size
if not data:
continue
all_hex_matches += scan_memory_for_keys(
data, hex_re, db_files, salt_to_dbs,
key_map, remaining_salts, base, pid_val, print,
)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
)
finally:
kernel32.CloseHandle(h)
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex模式")
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
return save_results(db_files, salt_to_dbs, key_map, output_path, print)

70
wechat_cli/main.py Normal file
View File

@@ -0,0 +1,70 @@
"""wechat-cli 入口"""
import sys
import click
from .core.context import AppContext
@click.group()
@click.option("--config", "config_path", default=None, envvar="WECHAT_CLI_CONFIG",
help="config.json 路径(默认自动查找)")
@click.pass_context
def cli(ctx, config_path):
"""WeChat CLI — 查询微信消息、联系人等数据
\b
使用示例:
wechat-cli init # 首次使用:提取密钥
wechat-cli sessions # 最近会话列表
wechat-cli sessions --limit 10 # 最近 10 个会话
wechat-cli history "张三" --limit 20 # 查看张三的最近 20 条消息
wechat-cli history "AI交流群" --start-time "2026-04-01" # 指定时间范围
wechat-cli search "Claude" --chat "AI交流群" # 在指定群里搜索关键词
wechat-cli search "你好" --limit 50 # 全局搜索
wechat-cli contacts --query "" # 搜索联系人
wechat-cli new-messages # 获取增量新消息
"""
# init 命令不需要 AppContext
if ctx.invoked_subcommand == "init":
return
try:
ctx.obj = AppContext(config_path)
except FileNotFoundError as e:
click.echo(str(e), err=True)
sys.exit(1)
except Exception as e:
click.echo(f"初始化失败: {e}", err=True)
sys.exit(1)
# 注册子命令
from .commands.init import init
from .commands.sessions import sessions
from .commands.history import history
from .commands.search import search
from .commands.contacts import contacts
from .commands.new_messages import new_messages
from .commands.members import members
from .commands.export import export
from .commands.stats import stats
from .commands.unread import unread
from .commands.favorites import favorites
cli.add_command(init)
cli.add_command(sessions)
cli.add_command(history)
cli.add_command(search)
cli.add_command(contacts)
cli.add_command(new_messages)
cli.add_command(members)
cli.add_command(export)
cli.add_command(stats)
cli.add_command(unread)
cli.add_command(favorites)
if __name__ == "__main__":
cli()

View File

View File

@@ -0,0 +1,29 @@
"""输出格式化 — JSON (大模型友好) / Text (人类可读)"""
import json
import sys
def output_json(data, file=None):
file = file or sys.stdout
json.dump(data, file, ensure_ascii=False, indent=2)
file.write('\n')
def output_text(text, file=None):
file = file or sys.stdout
file.write(text)
if not text.endswith('\n'):
file.write('\n')
def output(data, fmt='json', file=None):
if fmt == 'json':
output_json(data, file)
else:
if isinstance(data, str):
output_text(data, file)
elif isinstance(data, dict) and 'text' in data:
output_text(data['text'], file)
else:
output_json(data, file)