Files
wechat-cli/wechat_cli/core/contacts.py
canghe e64006bafe Initial release: wechat-cli v0.2.0
A CLI tool to query local WeChat data with 11 commands:
sessions, history, search, contacts, members, stats, export,
favorites, unread, new-messages, and init.

Features:
- Self-contained init with key extraction (no external deps)
- On-the-fly SQLCipher decryption with caching
- JSON output by default for LLM/AI tool integration
- Message type filtering and chat statistics
- Markdown/txt export for conversations
- Cross-platform: macOS, Windows, Linux
2026-04-04 11:10:10 +08:00

202 lines
6.4 KiB
Python

"""联系人管理 — 加载、缓存、模糊匹配"""
import os
import re
import sqlite3
_contact_names = None # {username: display_name}
_contact_full = None # [{username, nick_name, remark}]
_self_username = None
def _load_contacts_from(db_path):
names = {}
full = []
conn = sqlite3.connect(db_path)
try:
for r in conn.execute("SELECT username, nick_name, remark FROM contact").fetchall():
uname, nick, remark = r
display = remark if remark else nick if nick else uname
names[uname] = display
full.append({'username': uname, 'nick_name': nick or '', 'remark': remark or ''})
finally:
conn.close()
return names, full
def get_contact_names(cache, decrypted_dir):
global _contact_names, _contact_full
if _contact_names is not None:
return _contact_names
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
if os.path.exists(pre_decrypted):
try:
_contact_names, _contact_full = _load_contacts_from(pre_decrypted)
return _contact_names
except Exception:
pass
path = cache.get(os.path.join("contact", "contact.db"))
if path:
try:
_contact_names, _contact_full = _load_contacts_from(path)
return _contact_names
except Exception:
pass
return {}
def get_contact_full(cache, decrypted_dir):
global _contact_full
if _contact_full is None:
get_contact_names(cache, decrypted_dir)
return _contact_full or []
def resolve_username(chat_name, cache, decrypted_dir):
names = get_contact_names(cache, decrypted_dir)
if chat_name in names or chat_name.startswith('wxid_') or '@chatroom' in chat_name:
return chat_name
chat_lower = chat_name.lower()
for uname, display in names.items():
if chat_lower == display.lower():
return uname
for uname, display in names.items():
if chat_lower in display.lower():
return uname
return None
def get_self_username(db_dir, cache, decrypted_dir):
global _self_username
if _self_username:
return _self_username
if not db_dir:
return ''
names = get_contact_names(cache, decrypted_dir)
account_dir = os.path.basename(os.path.dirname(db_dir))
candidates = [account_dir]
m = re.fullmatch(r'(.+)_([0-9a-fA-F]{4,})', account_dir)
if m:
candidates.insert(0, m.group(1))
for candidate in candidates:
if candidate and candidate in names:
_self_username = candidate
return _self_username
return ''
def get_group_members(chatroom_username, cache, decrypted_dir):
"""获取群聊成员列表。
通过 contact.db 的 chatroom_member 关联表查询。
Returns:
dict: {'members': [...], 'owner': str}
每个 member: {'username': ..., 'nick_name': ..., 'remark': ..., 'display_name': ...}
"""
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
if os.path.exists(pre_decrypted):
db_path = pre_decrypted
else:
db_path = cache.get(os.path.join("contact", "contact.db"))
if not db_path:
return {'members': [], 'owner': ''}
names = get_contact_names(cache, decrypted_dir)
conn = sqlite3.connect(db_path)
try:
# 1. 找到 chatroom 的 contact.id
row = conn.execute("SELECT id FROM contact WHERE username = ?", (chatroom_username,)).fetchone()
if not row:
return {'members': [], 'owner': ''}
room_id = row[0]
# 2. 获取群主
owner = ''
owner_row = conn.execute("SELECT owner FROM chat_room WHERE id = ?", (room_id,)).fetchone()
if owner_row and owner_row[0]:
owner = names.get(owner_row[0], owner_row[0])
# 3. 获取成员 ID 列表
member_ids = [r[0] for r in conn.execute(
"SELECT member_id FROM chatroom_member WHERE room_id = ?", (room_id,)
).fetchall()]
if not member_ids:
return {'members': [], 'owner': owner}
# 4. 批量查询成员信息
placeholders = ','.join('?' * len(member_ids))
members = []
for uid, username, nick, remark in conn.execute(
f"SELECT id, username, nick_name, remark FROM contact WHERE id IN ({placeholders})",
member_ids
):
display = remark if remark else nick if nick else username
members.append({
'username': username,
'nick_name': nick or '',
'remark': remark or '',
'display_name': display,
})
# 按 display_name 排序,群主排最前
members.sort(key=lambda m: (0 if m['username'] == (owner_row[0] if owner_row else '') else 1, m['display_name']))
return {'members': members, 'owner': owner}
finally:
conn.close()
def get_contact_detail(username, cache, decrypted_dir):
"""获取联系人详情。
Returns:
dict or None: 联系人详细信息
"""
pre_decrypted = os.path.join(decrypted_dir, "contact", "contact.db")
if os.path.exists(pre_decrypted):
db_path = pre_decrypted
else:
db_path = cache.get(os.path.join("contact", "contact.db"))
if not db_path:
return None
conn = sqlite3.connect(db_path)
try:
row = conn.execute(
"SELECT username, nick_name, remark, alias, description, "
"small_head_url, big_head_url, verify_flag, local_type "
"FROM contact WHERE username = ?",
(username,)
).fetchone()
if not row:
return None
uname, nick, remark, alias, desc, small_url, big_url, verify, ltype = row
return {
'username': uname,
'nick_name': nick or '',
'remark': remark or '',
'alias': alias or '',
'description': desc or '',
'avatar': small_url or big_url or '',
'verify_flag': verify or 0,
'local_type': ltype,
'is_group': '@chatroom' in uname,
'is_subscription': uname.startswith('gh_'),
}
finally:
conn.close()
def display_name_for_username(username, names, db_dir, cache, decrypted_dir):
if not username:
return ''
if username == get_self_username(db_dir, cache, decrypted_dir):
return 'me'
return names.get(username, username)