Files
wechat-decrypt/find_all_keys_linux.py
PeanutSplash 30112b9a10 fix(linux): address code review feedback
- SUDO_USER: skip fallback entirely when user is invalid (KeyError)
- load_config: move default merge after db_dir check to avoid dead code
- _is_wechat_process: prefer exact comm match, use exe substring as fallback
2026-03-07 21:35:24 +08:00

247 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Linux 版微信数据库密钥提取
原理: 与 Windows/macOS 相同 — 扫描微信进程内存,查找
WCDB 缓存的 x'<64hex_enc_key><32hex_salt>' 模式,
通过匹配数据库 salt + HMAC 校验确认密钥。
读取方式: /proc/<pid>/maps + /proc/<pid>/mem
权限要求: root 或 CAP_SYS_PTRACE
"""
import functools
import os
import re
import sys
import time
from key_scan_common import (
collect_db_files, scan_memory_for_keys, cross_verify_keys, save_results,
)
print = functools.partial(print, flush=True)
def _safe_readlink(path):
try:
return os.path.realpath(os.readlink(path))
except OSError:
return ""
_KNOWN_COMMS = {"wechat", "wechatappex", "weixin"}
_INTERPRETER_PREFIXES = ("python", "bash", "sh", "zsh", "node", "perl", "ruby")
def _is_wechat_process(pid):
"""检查 pid 是否为微信进程。
优先精确匹配 comm 名称wechat、WeChatAppEx 等),
再用 exe 路径子串匹配作为 fallback同时排除解释器进程。
"""
if pid == os.getpid():
return False
try:
with open(f"/proc/{pid}/comm") as f:
comm = f.read().strip()
# 优先精确匹配 comm最可靠
if comm.lower() in _KNOWN_COMMS:
return True
exe_path = _safe_readlink(f"/proc/{pid}/exe")
exe_name = os.path.basename(exe_path)
# 排除脚本解释器进程(避免匹配 python3.11 wechat-decrypt 等)
if any(exe_name.lower().startswith(p) for p in _INTERPRETER_PREFIXES):
return False
# fallback: exe 名称子串匹配
return "wechat" in exe_name.lower() or "weixin" in exe_name.lower()
except (PermissionError, FileNotFoundError, ProcessLookupError):
return False
def get_pids():
"""返回所有疑似微信主进程的 (pid, rss_kb) 列表,按内存降序。"""
pids = []
for pid_str in os.listdir("/proc"):
if not pid_str.isdigit():
continue
pid = int(pid_str)
try:
if not _is_wechat_process(pid):
continue
with open(f"/proc/{pid}/statm") as f:
rss_pages = int(f.read().split()[1])
rss_kb = rss_pages * 4
pids.append((pid, rss_kb))
except (PermissionError, FileNotFoundError, ProcessLookupError):
continue
if not pids:
raise RuntimeError("未检测到 Linux 微信进程")
pids.sort(key=lambda item: item[1], reverse=True)
for pid, rss_kb in pids:
exe_path = _safe_readlink(f"/proc/{pid}/exe")
print(f"[+] WeChat PID={pid} ({rss_kb // 1024}MB) {exe_path}")
return pids
_SKIP_MAPPINGS = {"[vdso]", "[vsyscall]", "[vvar]"}
_SKIP_PATH_PREFIXES = ("/usr/lib/", "/lib/", "/usr/share/")
def _get_readable_regions(pid):
"""解析 /proc/<pid>/maps返回可读内存区域列表。
跳过 [vdso]、[vsyscall] 等特殊映射和系统库映射,
聚焦匿名映射和堆区WCDB 密钥缓存所在位置)。
"""
regions = []
with open(f"/proc/{pid}/maps") as f:
for line in f:
parts = line.split()
if len(parts) < 2:
continue
if "r" not in parts[1]:
continue
# 跳过特殊映射和无关系统库,但保留 wcdb/wechat 相关库
if len(parts) >= 6:
mapping_name = parts[5]
if mapping_name in _SKIP_MAPPINGS:
continue
mapping_lower = mapping_name.lower()
if (any(mapping_name.startswith(p) for p in _SKIP_PATH_PREFIXES)
and "wcdb" not in mapping_lower
and "wechat" not in mapping_lower
and "weixin" not in mapping_lower):
continue
start_s, end_s = parts[0].split("-")
start = int(start_s, 16)
size = int(end_s, 16) - start
if 0 < size < 500 * 1024 * 1024:
regions.append((start, size))
return regions
def _check_permissions():
"""检查是否有读取进程内存的权限root 或 CAP_SYS_PTRACE"""
if os.geteuid() == 0:
return
# 检查 CAP_SYS_PTRACE: 读取 /proc/self/status 中的 CapEff
try:
with open("/proc/self/status") as f:
for line in f:
if line.startswith("CapEff:"):
cap_eff = int(line.split(":")[1].strip(), 16)
CAP_SYS_PTRACE = 1 << 19
if cap_eff & CAP_SYS_PTRACE:
return
break
except (OSError, ValueError):
pass
print("[!] 需要 root 权限或 CAP_SYS_PTRACE 才能读取进程内存")
print(" 请使用: sudo python3 find_all_keys.py")
print(" 或授予 capability: sudo setcap cap_sys_ptrace=ep $(which python3)")
sys.exit(1)
def main():
from config import load_config
_cfg = load_config()
db_dir = _cfg["db_dir"]
out_file = _cfg["keys_file"]
_check_permissions()
print("=" * 60)
print(" 提取 Linux 微信数据库密钥(内存扫描)")
print("=" * 60)
# 1. 收集 DB 文件和 salt
db_files, salt_to_dbs = collect_db_files(db_dir)
if not db_files:
raise RuntimeError(f"{db_dir} 未找到可解密的 .db 文件")
print(f"\n找到 {len(db_files)} 个数据库, {len(salt_to_dbs)} 个不同的 salt")
for salt_hex, dbs in sorted(salt_to_dbs.items(), key=lambda x: len(x[1]), reverse=True):
print(f" salt {salt_hex}: {', '.join(dbs)}")
# 2. 找到微信进程
pids = get_pids()
hex_re = re.compile(rb"x'([0-9a-fA-F]{64,192})'")
key_map = {} # salt_hex -> enc_key_hex
remaining_salts = set(salt_to_dbs.keys())
all_hex_matches = 0
t0 = time.time()
for pid, rss_kb in pids:
try:
regions = _get_readable_regions(pid)
except PermissionError:
print(f"[WARN] 无法读取 /proc/{pid}/maps权限不足跳过")
continue
except (FileNotFoundError, ProcessLookupError):
print(f"[WARN] PID {pid} 已退出,跳过")
continue
total_bytes = sum(s for _, s in regions)
total_mb = total_bytes / 1024 / 1024
print(f"\n[*] 扫描 PID={pid} ({total_mb:.0f}MB, {len(regions)} 区域)")
scanned_bytes = 0
try:
mem = open(f"/proc/{pid}/mem", "rb")
except PermissionError:
print(f"[WARN] 无法打开 /proc/{pid}/mem权限不足跳过")
continue
except (FileNotFoundError, ProcessLookupError):
print(f"[WARN] PID {pid} 已退出,跳过")
continue
# 防御 TOCTOU: 打开 mem 后再次确认仍为微信进程
if not _is_wechat_process(pid):
print(f"[WARN] PID {pid} 已不是微信进程,跳过")
mem.close()
continue
try:
for reg_idx, (base, size) in enumerate(regions):
try:
mem.seek(base)
data = mem.read(size)
except (OSError, ValueError):
continue
scanned_bytes += len(data)
all_hex_matches += scan_memory_for_keys(
data, hex_re, db_files, salt_to_dbs,
key_map, remaining_salts, base, pid, print,
)
if (reg_idx + 1) % 200 == 0:
elapsed = time.time() - t0
progress = scanned_bytes / total_bytes * 100 if total_bytes else 100
print(
f" [{progress:.1f}%] {len(key_map)}/{len(salt_to_dbs)} salts matched, "
f"{all_hex_matches} hex patterns, {elapsed:.1f}s"
)
finally:
mem.close()
if not remaining_salts:
print(f"\n[+] 所有密钥已找到,跳过剩余进程")
break
elapsed = time.time() - t0
print(f"\n扫描完成: {elapsed:.1f}s, {len(pids)} 个进程, {all_hex_matches} hex 模式")
cross_verify_keys(db_files, salt_to_dbs, key_map, print)
save_results(db_files, salt_to_dbs, key_map, db_dir, out_file, print)
if __name__ == "__main__":
try:
main()
except RuntimeError as exc:
print(f"\n[ERROR] {exc}")
sys.exit(1)