A CLI tool to query local WeChat data with 11 commands: sessions, history, search, contacts, members, stats, export, favorites, unread, new-messages, and init. Features: - Self-contained init with key extraction (no external deps) - On-the-fly SQLCipher decryption with caching - JSON output by default for LLM/AI tool integration - Message type filtering and chat statistics - Markdown/txt export for conversations - Cross-platform: macOS, Windows, Linux
78 lines
2.6 KiB
Python
78 lines
2.6 KiB
Python
"""数据库解密 — SQLCipher 4, AES-256-CBC"""
|
|
|
|
import os
|
|
import struct
|
|
|
|
from Crypto.Cipher import AES
|
|
|
|
PAGE_SZ = 4096
|
|
KEY_SZ = 32
|
|
SALT_SZ = 16
|
|
RESERVE_SZ = 80 # IV(16) + HMAC-SHA512(64)
|
|
SQLITE_HDR = b'SQLite format 3\x00'
|
|
WAL_HEADER_SZ = 32
|
|
WAL_FRAME_HEADER_SZ = 24
|
|
|
|
|
|
def decrypt_page(enc_key, page_data, pgno):
|
|
iv = page_data[PAGE_SZ - RESERVE_SZ: PAGE_SZ - RESERVE_SZ + 16]
|
|
if pgno == 1:
|
|
encrypted = page_data[SALT_SZ: PAGE_SZ - RESERVE_SZ]
|
|
cipher = AES.new(enc_key, AES.MODE_CBC, iv)
|
|
decrypted = cipher.decrypt(encrypted)
|
|
return bytes(bytearray(SQLITE_HDR + decrypted + b'\x00' * RESERVE_SZ))
|
|
else:
|
|
encrypted = page_data[:PAGE_SZ - RESERVE_SZ]
|
|
cipher = AES.new(enc_key, AES.MODE_CBC, iv)
|
|
decrypted = cipher.decrypt(encrypted)
|
|
return decrypted + b'\x00' * RESERVE_SZ
|
|
|
|
|
|
def full_decrypt(db_path, out_path, enc_key):
|
|
file_size = os.path.getsize(db_path)
|
|
total_pages = file_size // PAGE_SZ
|
|
os.makedirs(os.path.dirname(out_path), exist_ok=True)
|
|
with open(db_path, 'rb') as fin, open(out_path, 'wb') as fout:
|
|
for pgno in range(1, total_pages + 1):
|
|
page = fin.read(PAGE_SZ)
|
|
if len(page) < PAGE_SZ:
|
|
if len(page) > 0:
|
|
page = page + b'\x00' * (PAGE_SZ - len(page))
|
|
else:
|
|
break
|
|
fout.write(decrypt_page(enc_key, page, pgno))
|
|
return total_pages
|
|
|
|
|
|
def decrypt_wal(wal_path, out_path, enc_key):
|
|
if not os.path.exists(wal_path):
|
|
return 0
|
|
wal_size = os.path.getsize(wal_path)
|
|
if wal_size <= WAL_HEADER_SZ:
|
|
return 0
|
|
patched = 0
|
|
with open(wal_path, 'rb') as wf, open(out_path, 'r+b') as df:
|
|
wal_hdr = wf.read(WAL_HEADER_SZ)
|
|
wal_salt1 = struct.unpack('>I', wal_hdr[16:20])[0]
|
|
wal_salt2 = struct.unpack('>I', wal_hdr[20:24])[0]
|
|
frame_size = WAL_FRAME_HEADER_SZ + PAGE_SZ
|
|
while wf.tell() + frame_size <= wal_size:
|
|
fh = wf.read(WAL_FRAME_HEADER_SZ)
|
|
if len(fh) < WAL_FRAME_HEADER_SZ:
|
|
break
|
|
pgno = struct.unpack('>I', fh[0:4])[0]
|
|
frame_salt1 = struct.unpack('>I', fh[8:12])[0]
|
|
frame_salt2 = struct.unpack('>I', fh[12:16])[0]
|
|
ep = wf.read(PAGE_SZ)
|
|
if len(ep) < PAGE_SZ:
|
|
break
|
|
if pgno == 0 or pgno > 1000000:
|
|
continue
|
|
if frame_salt1 != wal_salt1 or frame_salt2 != wal_salt2:
|
|
continue
|
|
dec = decrypt_page(enc_key, ep, pgno)
|
|
df.seek((pgno - 1) * PAGE_SZ)
|
|
df.write(dec)
|
|
patched += 1
|
|
return patched
|