Files
edict/scripts/sync_from_openclaw_runtime.py
狼哥 f92c54f371 fix: support OPENCLAW_HOME env var (#275)
fix: support OPENCLAW_HOME for non-standard OpenClaw paths\n\nAdds get_openclaw_home() helper in scripts/utils.py and updates all\ninstall/runtime scripts to resolve OpenClaw home from OPENCLAW_HOME\nenvironment variable with fallback to ~/.openclaw.\n\nCloses #271
2026-04-10 00:55:27 +08:00

354 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import json
import pathlib
import time
import datetime
import traceback
import logging
from file_lock import atomic_json_write, atomic_json_read
from utils import get_openclaw_home
log = logging.getLogger('sync_runtime')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(message)s', datefmt='%H:%M:%S')
BASE = pathlib.Path(__file__).resolve().parent.parent
DATA = BASE / 'data'
DATA.mkdir(exist_ok=True)
SYNC_STATUS = DATA / 'sync_status.json'
SESSIONS_ROOT = get_openclaw_home() / 'agents'
def write_status(**kwargs):
atomic_json_write(SYNC_STATUS, kwargs)
def ms_to_str(ts_ms):
if not ts_ms:
return '-'
try:
return datetime.datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d %H:%M:%S')
except Exception:
return '-'
def state_from_session(age_ms, aborted):
if aborted:
return 'Blocked'
if age_ms <= 2 * 60 * 1000:
return 'Doing'
if age_ms <= 60 * 60 * 1000:
return 'Review'
return 'Next'
def detect_official(agent_id):
mapping = {
'main': ('储君', '太子'), # legacy id for taizi
'taizi': ('储君', '太子'),
'zhongshu': ('中书令', '中书省'),
'menxia': ('侍中', '门下省'),
'shangshu': ('尚书令', '尚书省'),
'hubu': ('户部尚书', '户部'),
'libu': ('礼部尚书', '礼部'),
'bingbu': ('兵部尚书', '兵部'),
'xingbu': ('刑部尚书', '刑部'),
'gongbu': ('工部尚书', '工部'),
'libu_hr': ('吏部尚书', '吏部'),
'zaochao': ('钦天监', '钦天监'),
}
return mapping.get(agent_id, ('尚书令', '尚书省'))
def load_activity(session_file, limit=12):
p = pathlib.Path(session_file or '')
if not p.exists():
return []
rows = []
try:
lines = p.read_text(errors='ignore').splitlines()
except Exception:
return []
# Read all valid JSON lines first
events = []
for ln in lines:
try:
item = json.loads(ln)
events.append(item)
except:
continue
# Process events to extract meaningful activity
# We want to show what the agent is *thinking* or *doing*
for item in reversed(events):
msg = item.get('message') or {}
role = msg.get('role')
ts = item.get('timestamp') or ''
if role == 'toolResult':
tool = msg.get('toolName', '-')
details = msg.get('details') or {}
# If tool output is short, show it
content = msg.get('content', [{'text': ''}])[0].get('text', '')
if len(content) < 50:
text = f"Tool '{tool}' returned: {content}"
else:
text = f"Tool '{tool}' finished"
rows.append({'at': ts, 'kind': 'tool', 'text': text})
elif role == 'assistant':
text = ''
for c in msg.get('content', []):
if c.get('type') == 'text' and c.get('text'):
raw_text = c.get('text').strip()
# Clean up common prefixes
clean_text = raw_text.replace('[[reply_to_current]]', '').strip()
if clean_text:
text = clean_text
break
if text:
# Prioritize showing the "thought" - usually the first few sentences
summary = text.split('\n')[0]
if len(summary) > 200:
summary = summary[:200] + '...'
rows.append({'at': ts, 'kind': 'assistant', 'text': summary})
elif role == 'user':
# Also show what user asked, can be context relevant
text = ''
for c in msg.get('content', []):
if c.get('type') == 'text':
text = c.get('text', '')[:100]
if text:
rows.append({'at': ts, 'kind': 'user', 'text': f"User: {text}..."})
if len(rows) >= limit:
break
# Re-order to chronological for display if needed, but the caller usually takes the first (latest)
return rows
def build_task(agent_id, session_key, row, now_ms):
session_id = row.get('sessionId') or session_key
updated_at = row.get('updatedAt') or 0
age_ms = max(0, now_ms - updated_at) if updated_at else 99 * 24 * 3600 * 1000
aborted = bool(row.get('abortedLastRun'))
state = state_from_session(age_ms, aborted)
official, org = detect_official(agent_id)
channel = row.get('lastChannel') or (row.get('origin') or {}).get('channel') or '-'
session_file = row.get('sessionFile', '')
# 尝试从 activity 获取更有意义的当前状态描述
latest_act = '等待指令'
acts = load_activity(session_file, limit=5)
# If the absolute latest is a tool result, look for the preceding assistant thought
# because that explains *why* the tool was called.
if acts:
first_act = acts[0]
if first_act['kind'] == 'tool' and len(acts) > 1:
# Look for next assistant message (which is actually previous in time)
for next_act in acts[1:]:
if next_act['kind'] == 'assistant':
latest_act = f"正在执行: {next_act['text'][:80]}"
break
else:
latest_act = first_act['text'][:60]
elif first_act['kind'] == 'assistant':
latest_act = f"思考中: {first_act['text'][:80]}"
else:
latest_act = acts[0]['text'][:60]
title_label = (row.get('origin') or {}).get('label') or session_key
# 清洗会话标题agent:xxx:cron:uuid → 定时任务, agent:xxx:subagent:uuid → 子任务
import re
if re.match(r'agent:\w+:cron:', title_label):
title = f"{org}定时任务"
elif re.match(r'agent:\w+:subagent:', title_label):
title = f"{org}子任务"
elif title_label == session_key or len(title_label) > 40:
title = f"{org}会话"
else:
title = f"{title_label}"
return {
'id': f"OC-{agent_id}-{str(session_id)[:8]}",
'title': title,
'official': official,
'org': org,
'state': state,
'now': latest_act,
'eta': ms_to_str(updated_at),
'block': '上次运行中断' if aborted else '',
'output': session_file,
'flow': {
'draft': f"agent={agent_id}",
'review': f"updatedAt={ms_to_str(updated_at)}",
'dispatch': f"sessionKey={session_key}",
},
'ac': '来自 OpenClaw runtime sessions 的实时映射',
'activity': load_activity(session_file, limit=10),
'sourceMeta': {
'agentId': agent_id,
'sessionKey': session_key,
'sessionId': session_id,
'updatedAt': updated_at,
'ageMs': age_ms,
'systemSent': bool(row.get('systemSent')),
'abortedLastRun': aborted,
'inputTokens': row.get('inputTokens'),
'outputTokens': row.get('outputTokens'),
'totalTokens': row.get('totalTokens'),
}
}
def main():
start = time.time()
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
now_ms = int(time.time() * 1000)
try:
tasks = []
scan_files = 0
if SESSIONS_ROOT.exists():
for agent_dir in sorted(SESSIONS_ROOT.iterdir()):
if not agent_dir.is_dir():
continue
agent_id = agent_dir.name
sessions_file = agent_dir / 'sessions' / 'sessions.json'
if not sessions_file.exists():
continue
scan_files += 1
try:
raw = json.loads(sessions_file.read_text())
except Exception:
continue
if not isinstance(raw, dict):
continue
for session_key, row in raw.items():
if not isinstance(row, dict):
continue
tasks.append(build_task(agent_id, session_key, row, now_ms))
# merge mission control tasks (最小接入)
mc_tasks_file = DATA / 'mission_control_tasks.json'
if mc_tasks_file.exists():
try:
mc_tasks = json.loads(mc_tasks_file.read_text())
if isinstance(mc_tasks, list):
tasks.extend(mc_tasks)
except Exception:
pass
# merge manual parallel tasks (用于军机处并行看板展示)
manual_tasks_file = DATA / 'manual_parallel_tasks.json'
if manual_tasks_file.exists():
try:
manual_tasks = json.loads(manual_tasks_file.read_text())
if isinstance(manual_tasks, list):
tasks.extend(manual_tasks)
except Exception:
pass
tasks.sort(key=lambda x: x.get('sourceMeta', {}).get('updatedAt', 0), reverse=True)
# 去重(同一 id 只保留第一个=最新的)
seen_ids = set()
deduped = []
for t in tasks:
if t['id'] not in seen_ids:
seen_ids.add(t['id'])
deduped.append(t)
tasks = deduped
# ── 过滤掉非 JJC 且非活跃的系统会话,防止看板噪音 ──
# 规则: 仅保留 24小时内更新的活跃会话且排除 cron/subagent 等纯后台任务
filtered_tasks = []
one_day_ago = now_ms - 24 * 3600 * 1000
for t in tasks:
# 始终保留 JJC 任务(如果有的话,虽然这里主要是 OC 任务,但以防万一)
if str(t['id']).startswith('JJC'):
filtered_tasks.append(t)
continue
# OC 任务过滤
updated = t.get('sourceMeta', {}).get('updatedAt', 0)
title = t.get('title', '')
# 1. 排除太旧的 (超过24小时)
if updated < one_day_ago:
continue
# 2. 排除纯后台 cron / subagent 任务,除非它们正在报错
if '定时任务' in title or '子任务' in title:
# 只有当它 block 或者 error 时才显示,否则视为噪音
if t.get('state') != 'Blocked':
continue
# 3. 排除已冷却的 OC 会话,避免污染看板
# 保留 Doing<2min、Review<60min、Blocked报错
# 仅过滤掉 Next>60min 无响应)等已结束/闲置的会话
state = t.get('state')
if state not in ('Doing', 'Review', 'Blocked'):
continue
filtered_tasks.append(t)
tasks = filtered_tasks
# ── 保留已有的 JJC-* 旨意任务(不覆盖皇上下旨记录)──
# JJC 任务的 now 字段由 Agent 自己通过 kanban_update.py progress 命令主动上报,
# 不再从会话日志中被动抓取。这里只做合并,不做 activity 映射。
existing_tasks_file = DATA / 'tasks_source.json'
if existing_tasks_file.exists():
try:
existing = json.loads(existing_tasks_file.read_text())
jjc_existing = [t for t in existing if str(t.get('id', '')).startswith('JJC')]
# 去掉 tasks 里已有的 JJC以防重复再把旨意放到最前面
tasks = [t for t in tasks if not str(t.get('id', '')).startswith('JJC')]
tasks = jjc_existing + tasks
except Exception as e:
log.error(f'merge existing JJC tasks failed: {e}')
pass
atomic_json_write(DATA / 'tasks_source.json', tasks)
duration_ms = int((time.time() - start) * 1000)
write_status(
ok=True,
lastSyncAt=now,
durationMs=duration_ms,
source='openclaw_runtime_sessions',
recordCount=len(tasks),
scannedSessionFiles=scan_files,
missingFields={},
error=None,
)
log.info(f'synced {len(tasks)} tasks from openclaw runtime in {duration_ms}ms')
except Exception as e:
duration_ms = int((time.time() - start) * 1000)
write_status(
ok=False,
lastSyncAt=now,
durationMs=duration_ms,
source='openclaw_runtime_sessions',
recordCount=0,
missingFields={},
error=f'{type(e).__name__}: {e}',
traceback=traceback.format_exc(limit=3),
)
raise
if __name__ == '__main__':
main()