A CLI tool to query local WeChat data with 11 commands: sessions, history, search, contacts, members, stats, export, favorites, unread, new-messages, and init. Features: - Self-contained init with key extraction (no external deps) - On-the-fly SQLCipher decryption with caching - JSON output by default for LLM/AI tool integration - Message type filtering and chat statistics - Markdown/txt export for conversations - Cross-platform: macOS, Windows, Linux
202 lines
6.4 KiB
Python
202 lines
6.4 KiB
Python
"""联系人管理 — 加载、缓存、模糊匹配"""
|
|
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
|
|
|
|
_contact_names = None # {username: display_name}
|
|
_contact_full = None # [{username, nick_name, remark}]
|
|
_self_username = None
|
|
|
|
|
|
def _load_contacts_from(db_path):
|
|
names = {}
|
|
full = []
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
for r in conn.execute("SELECT username, nick_name, remark FROM contact").fetchall():
|
|
uname, nick, remark = r
|
|
display = remark if remark else nick if nick else uname
|
|
names[uname] = display
|
|
full.append({'username': uname, 'nick_name': nick or '', 'remark': remark or ''})
|
|
finally:
|
|
conn.close()
|
|
return names, full
|
|
|
|
|
|
def get_contact_names(cache, decrypted_dir):
|
|
global _contact_names, _contact_full
|
|
if _contact_names is not None:
|
|
return _contact_names
|
|
|
|
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
|
|
if os.path.exists(pre_decrypted):
|
|
try:
|
|
_contact_names, _contact_full = _load_contacts_from(pre_decrypted)
|
|
return _contact_names
|
|
except Exception:
|
|
pass
|
|
|
|
path = cache.get(os.path.join("contact", "contact.db"))
|
|
if path:
|
|
try:
|
|
_contact_names, _contact_full = _load_contacts_from(path)
|
|
return _contact_names
|
|
except Exception:
|
|
pass
|
|
|
|
return {}
|
|
|
|
|
|
def get_contact_full(cache, decrypted_dir):
|
|
global _contact_full
|
|
if _contact_full is None:
|
|
get_contact_names(cache, decrypted_dir)
|
|
return _contact_full or []
|
|
|
|
|
|
def resolve_username(chat_name, cache, decrypted_dir):
|
|
names = get_contact_names(cache, decrypted_dir)
|
|
if chat_name in names or chat_name.startswith('wxid_') or '@chatroom' in chat_name:
|
|
return chat_name
|
|
chat_lower = chat_name.lower()
|
|
for uname, display in names.items():
|
|
if chat_lower == display.lower():
|
|
return uname
|
|
for uname, display in names.items():
|
|
if chat_lower in display.lower():
|
|
return uname
|
|
return None
|
|
|
|
|
|
def get_self_username(db_dir, cache, decrypted_dir):
|
|
global _self_username
|
|
if _self_username:
|
|
return _self_username
|
|
if not db_dir:
|
|
return ''
|
|
names = get_contact_names(cache, decrypted_dir)
|
|
account_dir = os.path.basename(os.path.dirname(db_dir))
|
|
candidates = [account_dir]
|
|
m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir)
|
|
if m:
|
|
candidates.insert(0, m.group(1))
|
|
for candidate in candidates:
|
|
if candidate and candidate in names:
|
|
_self_username = candidate
|
|
return _self_username
|
|
return ''
|
|
|
|
|
|
def get_group_members(chatroom_username, cache, decrypted_dir):
|
|
"""获取群聊成员列表。
|
|
|
|
通过 contact.db 的 chatroom_member 关联表查询。
|
|
|
|
Returns:
|
|
dict: {'members': [...], 'owner': str}
|
|
每个 member: {'username': ..., 'nick_name': ..., 'remark': ..., 'display_name': ...}
|
|
"""
|
|
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
|
|
if os.path.exists(pre_decrypted):
|
|
db_path = pre_decrypted
|
|
else:
|
|
db_path = cache.get(os.path.join("contact", "contact.db"))
|
|
|
|
if not db_path:
|
|
return {'members': [], 'owner': ''}
|
|
|
|
names = get_contact_names(cache, decrypted_dir)
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
# 1. 找到 chatroom 的 contact.id
|
|
row = conn.execute("SELECT id FROM contact WHERE username = ?", (chatroom_username,)).fetchone()
|
|
if not row:
|
|
return {'members': [], 'owner': ''}
|
|
room_id = row[0]
|
|
|
|
# 2. 获取群主
|
|
owner = ''
|
|
owner_row = conn.execute("SELECT owner FROM chat_room WHERE id = ?", (room_id,)).fetchone()
|
|
if owner_row and owner_row[0]:
|
|
owner = names.get(owner_row[0], owner_row[0])
|
|
|
|
# 3. 获取成员 ID 列表
|
|
member_ids = [r[0] for r in conn.execute(
|
|
"SELECT member_id FROM chatroom_member WHERE room_id = ?", (room_id,)
|
|
).fetchall()]
|
|
if not member_ids:
|
|
return {'members': [], 'owner': owner}
|
|
|
|
# 4. 批量查询成员信息
|
|
placeholders = ','.join('?' * len(member_ids))
|
|
members = []
|
|
for uid, username, nick, remark in conn.execute(
|
|
f"SELECT id, username, nick_name, remark FROM contact WHERE id IN ({placeholders})",
|
|
member_ids
|
|
):
|
|
display = remark if remark else nick if nick else username
|
|
members.append({
|
|
'username': username,
|
|
'nick_name': nick or '',
|
|
'remark': remark or '',
|
|
'display_name': display,
|
|
})
|
|
|
|
# 按 display_name 排序,群主排最前
|
|
members.sort(key=lambda m: (0 if m['username'] == (owner_row[0] if owner_row else '') else 1, m['display_name']))
|
|
|
|
return {'members': members, 'owner': owner}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_contact_detail(username, cache, decrypted_dir):
|
|
"""获取联系人详情。
|
|
|
|
Returns:
|
|
dict or None: 联系人详细信息
|
|
"""
|
|
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
|
|
if os.path.exists(pre_decrypted):
|
|
db_path = pre_decrypted
|
|
else:
|
|
db_path = cache.get(os.path.join("contact", "contact.db"))
|
|
if not db_path:
|
|
return None
|
|
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT username, nick_name, remark, alias, description, "
|
|
"small_head_url, big_head_url, verify_flag, local_type "
|
|
"FROM contact WHERE username = ?",
|
|
(username,)
|
|
).fetchone()
|
|
if not row:
|
|
return None
|
|
uname, nick, remark, alias, desc, small_url, big_url, verify, ltype = row
|
|
return {
|
|
'username': uname,
|
|
'nick_name': nick or '',
|
|
'remark': remark or '',
|
|
'alias': alias or '',
|
|
'description': desc or '',
|
|
'avatar': small_url or big_url or '',
|
|
'verify_flag': verify or 0,
|
|
'local_type': ltype,
|
|
'is_group': '@chatroom' in uname,
|
|
'is_subscription': uname.startswith('gh_'),
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def display_name_for_username(username, names, db_dir, cache, decrypted_dir):
|
|
if not username:
|
|
return ''
|
|
if username == get_self_username(db_dir, cache, decrypted_dir):
|
|
return 'me'
|
|
return names.get(username, username)
|