Files
edict/scripts/sync_officials_stats.py
cft0808 8ad2b512c3 fix: 对齐文档与OpenClaw实现链路(安装/同步/统计)
- install.sh: workspace与agent注册补齐太子/吏部/早朝
- sync_agent_config.py: 增加__main__入口,修正taizi映射,兼容model对象结构
- sync_officials_stats.py: 兼容model对象结构,修复TypeError,太子ID改为taizi并保留main兼容
- README_EN.md: 修正Quick Start里旧的9-workspace描述
2026-02-27 21:49:58 +08:00

214 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""同步各官员统计数据 → data/officials_stats.json"""
import json, pathlib, datetime, logging
from file_lock import atomic_json_write
log = logging.getLogger('officials')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(message)s', datefmt='%H:%M:%S')
BASE = pathlib.Path(__file__).resolve().parent.parent
DATA = BASE / 'data'
AGENTS_ROOT = pathlib.Path.home() / '.openclaw' / 'agents'
OPENCLAW_CFG = pathlib.Path.home() / '.openclaw' / 'openclaw.json'
# Anthropic 定价每1M token美元
MODEL_PRICING = {
'anthropic/claude-sonnet-4-6': {'in':3.0, 'out':15.0, 'cr':0.30, 'cw':3.75},
'anthropic/claude-opus-4-5': {'in':15.0,'out':75.0, 'cr':1.50, 'cw':18.75},
'anthropic/claude-haiku-3-5': {'in':0.8, 'out':4.0, 'cr':0.08, 'cw':1.0},
'openai/gpt-4o': {'in':2.5, 'out':10.0, 'cr':1.25, 'cw':0},
'openai/gpt-4o-mini': {'in':0.15,'out':0.6, 'cr':0.075,'cw':0},
'google/gemini-2.0-flash': {'in':0.075,'out':0.3, 'cr':0, 'cw':0},
'google/gemini-2.5-pro': {'in':1.25,'out':10.0, 'cr':0, 'cw':0},
}
OFFICIALS = [
{'id':'taizi', 'label':'太子', 'role':'太子', 'emoji':'🤴','rank':'储君'},
{'id':'zhongshu','label':'中书省','role':'中书令', 'emoji':'📜','rank':'正一品'},
{'id':'menxia', 'label':'门下省','role':'侍中', 'emoji':'🔍','rank':'正一品'},
{'id':'shangshu','label':'尚书省','role':'尚书令', 'emoji':'📮','rank':'正一品'},
{'id':'libu', 'label':'礼部', 'role':'礼部尚书','emoji':'📝','rank':'正二品'},
{'id':'hubu', 'label':'户部', 'role':'户部尚书','emoji':'💰','rank':'正二品'},
{'id':'bingbu', 'label':'兵部', 'role':'兵部尚书','emoji':'⚔️','rank':'正二品'},
{'id':'xingbu', 'label':'刑部', 'role':'刑部尚书','emoji':'⚖️','rank':'正二品'},
{'id':'gongbu', 'label':'工部', 'role':'工部尚书','emoji':'🔧','rank':'正二品'},
{'id':'libu_hr', 'label':'吏部', 'role':'吏部尚书','emoji':'👔','rank':'正二品'},
{'id':'zaochao', 'label':'钦天监','role':'朝报官', 'emoji':'📰','rank':'正三品'},
]
def rj(p, d):
try: return json.loads(pathlib.Path(p).read_text())
except Exception: return d
# Pre-load openclaw config once (avoid re-reading per agent)
_OPENCLAW_CACHE = None
def _load_openclaw_cfg():
global _OPENCLAW_CACHE
if _OPENCLAW_CACHE is None:
_OPENCLAW_CACHE = rj(OPENCLAW_CFG, {})
return _OPENCLAW_CACHE
def normalize_model(model_value, fallback='anthropic/claude-sonnet-4-6'):
if isinstance(model_value, str) and model_value:
return model_value
if isinstance(model_value, dict):
return model_value.get('primary') or model_value.get('id') or fallback
return fallback
def get_model(agent_id):
cfg = _load_openclaw_cfg()
default = normalize_model(cfg.get('agents',{}).get('defaults',{}).get('model',{}), 'anthropic/claude-sonnet-4-6')
for a in cfg.get('agents',{}).get('list',[]):
if a.get('id') == agent_id:
return normalize_model(a.get('model', default), default)
# 兼容历史:太子曾使用 main 作为运行时 id
if agent_id == 'taizi':
for a in cfg.get('agents',{}).get('list',[]):
if a.get('id') == 'main':
return normalize_model(a.get('model', default), default)
return default
def scan_agent(agent_id):
"""从 sessions.json 读取 token 统计(累计所有 session"""
sj = AGENTS_ROOT / agent_id / 'sessions' / 'sessions.json'
if not sj.exists() and agent_id == 'taizi':
sj = AGENTS_ROOT / 'main' / 'sessions' / 'sessions.json'
if not sj.exists():
return {'tokens_in':0,'tokens_out':0,'cache_read':0,'cache_write':0,'sessions':0,'last_active':None,'messages':0}
data = rj(sj, {})
tin = tout = cr = cw = 0
last_ts = None
for sid, v in data.items():
tin += v.get('inputTokens', 0) or 0
tout += v.get('outputTokens', 0) or 0
cr += v.get('cacheRead', 0) or 0
cw += v.get('cacheWrite', 0) or 0
ts = v.get('updatedAt')
if ts:
try:
t = datetime.datetime.fromtimestamp(ts/1000) if isinstance(ts,int) else datetime.datetime.fromisoformat(ts.replace('Z','+00:00'))
if last_ts is None or t > last_ts: last_ts = t
except Exception: pass
# Estimate message count from most recent session JSONL
msg_count = 0
if data:
try:
sf_key = max(data.keys(), key=lambda k: data[k].get('updatedAt',0) or 0, default=None)
except Exception:
sf_key = None
else:
sf_key = None
if sf_key and data[sf_key].get('sessionFile'):
sf = AGENTS_ROOT / agent_id / 'sessions' / pathlib.Path(data[sf_key]['sessionFile']).name
try:
lines = sf.read_text(errors='ignore').splitlines()
for ln in lines:
try:
e = json.loads(ln)
if e.get('type') == 'message' and e.get('message',{}).get('role') == 'assistant':
msg_count += 1
except Exception: pass
except Exception: pass
return {
'tokens_in': tin, 'tokens_out': tout,
'cache_read': cr, 'cache_write': cw,
'sessions': len(data),
'last_active': last_ts.strftime('%Y-%m-%d %H:%M') if last_ts else None,
'messages': msg_count,
}
def calc_cost(s, model):
p = MODEL_PRICING.get(model, MODEL_PRICING['anthropic/claude-sonnet-4-6'])
usd = (s['tokens_in']/1e6*p['in'] + s['tokens_out']/1e6*p['out']
+ s['cache_read']/1e6*p['cr'] + s['cache_write']/1e6*p['cw'])
return round(usd, 4)
def get_task_stats(org_label, tasks):
done = [t for t in tasks if t.get('state')=='Done' and t.get('org')==org_label]
active = [t for t in tasks if t.get('state') in ('Doing','Review','Assigned') and t.get('org')==org_label]
fl = sum(1 for t in tasks for f in t.get('flow_log',[])
if f.get('from')==org_label or f.get('to')==org_label)
# 参与的旨意JJC列表
participated = []
for t in tasks:
if not t['id'].startswith('JJC'): continue
for f in t.get('flow_log',[]):
if f.get('from')==org_label or f.get('to')==org_label:
if t['id'] not in [x['id'] for x in participated]:
participated.append({'id':t['id'],'title':t.get('title',''),'state':t.get('state','')})
break
return {'tasks_done':len(done),'tasks_active':len(active),
'flow_participations':fl,'participated_edicts':participated}
def get_hb(agent_id, live_tasks):
for t in live_tasks:
if t.get('sourceMeta',{}).get('agentId')==agent_id and t.get('heartbeat'):
return t['heartbeat']
return {'status':'idle','label':'⚪ 待命','ageSec':None}
def main():
tasks = rj(DATA/'tasks_source.json', [])
live = rj(DATA/'live_status.json', {})
live_tasks = live.get('tasks', [])
result = []
for off in OFFICIALS:
model = get_model(off['id'])
ss = scan_agent(off['id'])
ts = get_task_stats(off['label'], tasks)
hb = get_hb(off['id'], live_tasks)
cost_usd = calc_cost(ss, model)
result.append({
**off,
'model': model,
'model_short': model.split('/')[-1] if isinstance(model, str) and '/' in model else str(model),
'sessions': ss['sessions'],
'tokens_in': ss['tokens_in'],
'tokens_out': ss['tokens_out'],
'cache_read': ss['cache_read'],
'cache_write': ss['cache_write'],
'tokens_total': ss['tokens_in'] + ss['tokens_out'],
'messages': ss['messages'],
'cost_usd': cost_usd,
'cost_cny': round(cost_usd * 7.25, 2),
'last_active': ss['last_active'],
'heartbeat': hb,
'tasks_done': ts['tasks_done'],
'tasks_active': ts['tasks_active'],
'flow_participations': ts['flow_participations'],
'participated_edicts': ts['participated_edicts'],
'merit_score': ts['tasks_done']*10 + ts['flow_participations']*2 + min(ss['sessions'],20),
})
result.sort(key=lambda x: x['merit_score'], reverse=True)
for i, r in enumerate(result): r['merit_rank'] = i+1
totals = {
'tokens_total': sum(r['tokens_total'] for r in result),
'cache_total': sum(r['cache_read']+r['cache_write'] for r in result),
'cost_usd': round(sum(r['cost_usd'] for r in result), 2),
'cost_cny': round(sum(r['cost_cny'] for r in result), 2),
'tasks_done': sum(r['tasks_done'] for r in result),
}
top = max(result, key=lambda x: x['merit_score'], default={})
payload = {
'generatedAt': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'officials': result,
'totals': totals,
'top_official': top.get('label',''),
}
atomic_json_write(DATA/'officials_stats.json', payload)
log.info(f'{len(result)} officials | cost=¥{totals["cost_cny"]} | top={top.get("label","")}')
if __name__ == '__main__':
main()