Files
edict/scripts/kanban_update.py
2026-03-03 15:10:23 +00:00

478 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
看板任务更新工具 - 供各省部 Agent 调用
用法:
# 新建任务(收旨时)
python3 kanban_update.py create JJC-20260223-012 "任务标题" Zhongshu 中书省 中书令
# 更新状态
python3 kanban_update.py state JJC-20260223-012 Menxia "规划方案已提交门下省"
# 添加流转记录
python3 kanban_update.py flow JJC-20260223-012 "中书省" "门下省" "规划方案提交审核"
# 完成任务
python3 kanban_update.py done JJC-20260223-012 "/path/to/output" "任务完成摘要"
# 添加/更新子任务 todo
python3 kanban_update.py todo JJC-20260223-012 1 "实现API接口" in-progress
python3 kanban_update.py todo JJC-20260223-012 1 "" completed
# 🔥 实时进展汇报Agent 主动调用,频率不限)
python3 kanban_update.py progress JJC-20260223-012 "正在分析需求拟定3个子方案" "1.调研技术选型|2.撰写设计文档|3.实现原型"
"""
import json, pathlib, datetime, sys, subprocess, logging, os, re
_BASE = pathlib.Path(__file__).resolve().parent.parent
TASKS_FILE = _BASE / 'data' / 'tasks_source.json'
REFRESH_SCRIPT = _BASE / 'scripts' / 'refresh_live_data.py'
log = logging.getLogger('kanban')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(message)s', datefmt='%H:%M:%S')
# 文件锁 —— 防止多 Agent 同时读写 tasks_source.json
from file_lock import atomic_json_read, atomic_json_update # noqa: E402
STATE_ORG_MAP = {
'Taizi': '太子', 'Zhongshu': '中书省', 'Menxia': '门下省', 'Assigned': '尚书省',
'Doing': '执行中', 'Review': '尚书省', 'Done': '完成', 'Blocked': '阻塞',
}
_STATE_AGENT_MAP = {
'Taizi': 'main',
'Zhongshu': 'zhongshu',
'Menxia': 'menxia',
'Assigned': 'shangshu',
'Review': 'shangshu',
'Pending': 'zhongshu',
}
_ORG_AGENT_MAP = {
'礼部': 'libu', '户部': 'hubu', '兵部': 'bingbu',
'刑部': 'xingbu', '工部': 'gongbu', '吏部': 'libu_hr',
'中书省': 'zhongshu', '门下省': 'menxia', '尚书省': 'shangshu',
}
_AGENT_LABELS = {
'main': '太子', 'taizi': '太子',
'zhongshu': '中书省', 'menxia': '门下省', 'shangshu': '尚书省',
'libu': '礼部', 'hubu': '户部', 'bingbu': '兵部', 'xingbu': '刑部',
'gongbu': '工部', 'libu_hr': '吏部', 'zaochao': '钦天监',
}
MAX_PROGRESS_LOG = 100 # 单任务最大进展日志条数
def load():
return atomic_json_read(TASKS_FILE, [])
def trigger_refresh():
"""异步触发 REFRESH_SCRIPT 更新 live_status避免阻塞主更新流程。"""
# 异步触发刷新,不阻塞调用方
try:
subprocess.Popen(['python3', str(REFRESH_SCRIPT)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception:
pass
def now_iso():
return datetime.datetime.now(datetime.timezone.utc).isoformat().replace('+00:00', 'Z')
def find_task(tasks, task_id):
return next((t for t in tasks if t.get('id') == task_id), None)
# 旨意标题最低要求
_MIN_TITLE_LEN = 6
_JUNK_TITLES = {
'?', '', '', '好的', '', '', '', '不是', '', '了解', '收到',
'', '', '知道了', '开启了么', '可以', '不行', '', 'ok', 'yes', 'no',
'你去开启', '测试', '试试', '看看',
}
def _sanitize_text(raw, max_len=80):
"""清洗文本剥离文件路径、URL、Conversation 元数据、传旨前缀、截断过长内容。"""
t = (raw or '').strip()
# 1) 剥离 Conversation info / Conversation 后面的所有内容
t = re.split(r'\n*Conversation\b', t, maxsplit=1)[0].strip()
# 2) 剥离 ```json 代码块
t = re.split(r'\n*```', t, maxsplit=1)[0].strip()
# 3) 剥离 Unix/Mac 文件路径 (/Users/xxx, /home/xxx, /opt/xxx, ./xxx)
t = re.sub(r'[/\\.~][A-Za-z0-9_\-./]+(?:\.(?:py|js|ts|json|md|sh|yaml|yml|txt|csv|html|css|log))?', '', t)
# 4) 剥离 URL
t = re.sub(r'https?://\S+', '', t)
# 5) 清理常见前缀: "传旨:" "下旨:" "下旨xxx:" 等
t = re.sub(r'^(传旨|下旨)([(][^)]*[)])?[:\uff1a]\s*', '', t)
# 6) 剥离系统元数据关键词
t = re.sub(r'(message_id|session_id|chat_id|open_id|user_id|tenant_key)\s*[:=]\s*\S+', '', t)
# 7) 合并多余空白
t = re.sub(r'\s+', ' ', t).strip()
# 8) 截断过长内容
if len(t) > max_len:
t = t[:max_len] + ''
return t
def _sanitize_title(raw):
"""清洗标题(最长 80 字符)。"""
return _sanitize_text(raw, 80)
def _sanitize_remark(raw):
"""清洗流转备注(最长 120 字符)。"""
return _sanitize_text(raw, 120)
def _infer_agent_id_from_runtime(task=None):
"""尽量推断当前执行该命令的 Agent。"""
for k in ('OPENCLAW_AGENT_ID', 'OPENCLAW_AGENT', 'AGENT_ID'):
v = (os.environ.get(k) or '').strip()
if v:
return v
cwd = str(pathlib.Path.cwd())
m = re.search(r'workspace-([a-zA-Z0-9_\-]+)', cwd)
if m:
return m.group(1)
fpath = str(pathlib.Path(__file__).resolve())
m2 = re.search(r'workspace-([a-zA-Z0-9_\-]+)', fpath)
if m2:
return m2.group(1)
if task:
state = task.get('state', '')
org = task.get('org', '')
aid = _STATE_AGENT_MAP.get(state)
if aid is None and state in ('Doing', 'Next'):
aid = _ORG_AGENT_MAP.get(org)
if aid:
return aid
return ''
def _is_valid_task_title(title):
"""校验标题是否足够作为一个旨意任务。"""
t = (title or '').strip()
if len(t) < _MIN_TITLE_LEN:
return False, f'标题过短({len(t)}<{_MIN_TITLE_LEN}字),疑似非旨意'
if t.lower() in _JUNK_TITLES:
return False, f'标题 "{t}" 不是有效旨意'
# 纯标点或问号
if re.fullmatch(r'[\s?!.。,,…·\-—~]+', t):
return False, '标题只有标点符号'
# 看起来像文件路径
if re.match(r'^[/\\~.]', t) or re.search(r'/[a-zA-Z0-9_-]+/[a-zA-Z0-9_-]+', t):
return False, f'标题看起来像文件路径,请用中文概括任务'
# 只剩标点和空白(清洗后可能变空)
if re.fullmatch(r'[\s\W]*', t):
return False, '标题清洗后为空'
return True, ''
def cmd_create(task_id, title, state, org, official, remark=None):
"""新建任务(收旨时立即调用)"""
# 清洗标题(剥离元数据)
title = _sanitize_title(title)
# 旨意标题校验
valid, reason = _is_valid_task_title(title)
if not valid:
log.warning(f'⚠️ 拒绝创建 {task_id}{reason}')
print(f'[看板] 拒绝创建:{reason}', flush=True)
return
actual_org = STATE_ORG_MAP.get(state, org)
clean_remark = _sanitize_remark(remark) if remark else f"下旨:{title}"
def modifier(tasks):
existing = next((t for t in tasks if t.get('id') == task_id), None)
if existing:
if existing.get('state') in ('Done', 'Cancelled'):
log.warning(f'⚠️ 任务 {task_id} 已完结 (state={existing["state"]}),不可覆盖')
return tasks
if existing.get('state') not in (None, '', 'Inbox', 'Pending'):
log.warning(f'任务 {task_id} 已存在 (state={existing["state"]}),将被覆盖')
tasks = [t for t in tasks if t.get('id') != task_id]
tasks.insert(0, {
"id": task_id, "title": title, "official": official,
"org": actual_org, "state": state,
"now": clean_remark[:60] if remark else f"已下旨,等待{actual_org}接旨",
"eta": "-", "block": "", "output": "", "ac": "",
"flow_log": [{"at": now_iso(), "from": "皇上", "to": actual_org, "remark": clean_remark}],
"updatedAt": now_iso()
})
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
log.info(f'✅ 创建 {task_id} | {title[:30]} | state={state}')
def cmd_state(task_id, new_state, now_text=None):
"""更新任务状态(原子操作)"""
old_state = [None]
def modifier(tasks):
t = find_task(tasks, task_id)
if not t:
log.error(f'任务 {task_id} 不存在')
return tasks
old_state[0] = t['state']
t['state'] = new_state
if new_state in STATE_ORG_MAP:
t['org'] = STATE_ORG_MAP[new_state]
if now_text:
t['now'] = now_text
t['updatedAt'] = now_iso()
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
log.info(f'{task_id} 状态更新: {old_state[0]}{new_state}')
def cmd_flow(task_id, from_dept, to_dept, remark):
"""添加流转记录(原子操作)"""
clean_remark = _sanitize_remark(remark)
def modifier(tasks):
t = find_task(tasks, task_id)
if not t:
log.error(f'任务 {task_id} 不存在')
return tasks
t.setdefault('flow_log', []).append({
"at": now_iso(), "from": from_dept, "to": to_dept, "remark": clean_remark
})
t['updatedAt'] = now_iso()
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
log.info(f'{task_id} 流转记录: {from_dept}{to_dept}')
def cmd_done(task_id, output_path='', summary=''):
"""标记任务完成(原子操作)"""
def modifier(tasks):
t = find_task(tasks, task_id)
if not t:
log.error(f'任务 {task_id} 不存在')
return tasks
t['state'] = 'Done'
t['output'] = output_path
t['now'] = summary or '任务已完成'
t.setdefault('flow_log', []).append({
"at": now_iso(), "from": t.get('org', '执行部门'),
"to": "皇上", "remark": f"✅ 完成:{summary or '任务已完成'}"
})
t['updatedAt'] = now_iso()
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
log.info(f'{task_id} 已完成')
def cmd_block(task_id, reason):
"""标记阻塞(原子操作)"""
def modifier(tasks):
t = find_task(tasks, task_id)
if not t:
log.error(f'任务 {task_id} 不存在')
return tasks
t['state'] = 'Blocked'
t['block'] = reason
t['updatedAt'] = now_iso()
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
log.warning(f'⚠️ {task_id} 已阻塞: {reason}')
def cmd_progress(task_id, now_text, todos_pipe='', tokens=0, cost=0.0, elapsed=0):
"""🔥 实时进展汇报 — Agent 主动调用,不改变状态,只更新 now + todos
now_text: 当前正在做什么的一句话描述(必填)
todos_pipe: 可选,用 | 分隔的 todo 列表,格式:
"已完成的事项✅|正在做的事项🔄|计划做的事项"
- 以 ✅ 结尾 → completed
- 以 🔄 结尾 → in-progress
- 其他 → not-started
tokens: 可选,本次消耗的 token 数
cost: 可选,本次成本(美元)
elapsed: 可选,本次耗时(秒)
"""
clean = _sanitize_remark(now_text)
# 解析 todos_pipe
parsed_todos = None
if todos_pipe:
new_todos = []
for i, item in enumerate(todos_pipe.split('|'), 1):
item = item.strip()
if not item:
continue
if item.endswith(''):
status = 'completed'
title = item[:-1].strip()
elif item.endswith('🔄'):
status = 'in-progress'
title = item[:-1].strip()
else:
status = 'not-started'
title = item
new_todos.append({'id': str(i), 'title': title, 'status': status})
if new_todos:
parsed_todos = new_todos
# 解析资源消耗参数
try:
tokens = int(tokens) if tokens else 0
except (ValueError, TypeError):
tokens = 0
try:
cost = float(cost) if cost else 0.0
except (ValueError, TypeError):
cost = 0.0
try:
elapsed = int(elapsed) if elapsed else 0
except (ValueError, TypeError):
elapsed = 0
done_cnt = [0]
total_cnt = [0]
def modifier(tasks):
t = find_task(tasks, task_id)
if not t:
log.error(f'任务 {task_id} 不存在')
return tasks
t['now'] = clean
if parsed_todos is not None:
t['todos'] = parsed_todos
# 多 Agent 并行进展日志
at = now_iso()
agent_id = _infer_agent_id_from_runtime(t)
agent_label = _AGENT_LABELS.get(agent_id, agent_id)
log_todos = parsed_todos if parsed_todos is not None else t.get('todos', [])
log_entry = {
'at': at, 'agent': agent_id, 'agentLabel': agent_label,
'text': clean, 'todos': log_todos,
'state': t.get('state', ''), 'org': t.get('org', ''),
}
# 资源消耗(可选字段,有值才写入)
if tokens > 0:
log_entry['tokens'] = tokens
if cost > 0:
log_entry['cost'] = cost
if elapsed > 0:
log_entry['elapsed'] = elapsed
t.setdefault('progress_log', []).append(log_entry)
# 限制 progress_log 大小,防止无限增长
if len(t['progress_log']) > MAX_PROGRESS_LOG:
t['progress_log'] = t['progress_log'][-MAX_PROGRESS_LOG:]
t['updatedAt'] = at
done_cnt[0] = sum(1 for td in t.get('todos', []) if td.get('status') == 'completed')
total_cnt[0] = len(t.get('todos', []))
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
res_info = ''
if tokens or cost or elapsed:
res_info = f' [res: {tokens}tok/${cost:.4f}/{elapsed}s]'
log.info(f'📡 {task_id} 进展: {clean[:40]}... [{done_cnt[0]}/{total_cnt[0]}]{res_info}')
def cmd_todo(task_id, todo_id, title, status='not-started', detail=''):
"""添加或更新子任务 todo原子操作
status: not-started / in-progress / completed
detail: 可选,该子任务的详细产出/说明Markdown 格式)
"""
# 校验 status 值
if status not in ('not-started', 'in-progress', 'completed'):
status = 'not-started'
result_info = [0, 0]
def modifier(tasks):
t = find_task(tasks, task_id)
if not t:
log.error(f'任务 {task_id} 不存在')
return tasks
if 'todos' not in t:
t['todos'] = []
existing = next((td for td in t['todos'] if str(td.get('id')) == str(todo_id)), None)
if existing:
existing['status'] = status
if title:
existing['title'] = title
if detail:
existing['detail'] = detail
else:
item = {'id': todo_id, 'title': title, 'status': status}
if detail:
item['detail'] = detail
t['todos'].append(item)
t['updatedAt'] = now_iso()
result_info[0] = sum(1 for td in t['todos'] if td.get('status') == 'completed')
result_info[1] = len(t['todos'])
return tasks
atomic_json_update(TASKS_FILE, modifier, [])
trigger_refresh()
log.info(f'{task_id} todo [{result_info[0]}/{result_info[1]}]: {todo_id}{status}')
_CMD_MIN_ARGS = {
'create': 6, 'state': 3, 'flow': 5, 'done': 2, 'block': 3, 'todo': 4, 'progress': 3,
}
if __name__ == '__main__':
args = sys.argv[1:]
if not args:
print(__doc__)
sys.exit(0)
cmd = args[0]
if cmd in _CMD_MIN_ARGS and len(args) < _CMD_MIN_ARGS[cmd]:
print(f'错误:"{cmd}" 命令至少需要 {_CMD_MIN_ARGS[cmd]} 个参数,实际 {len(args)}')
print(__doc__)
sys.exit(1)
if cmd == 'create':
cmd_create(args[1], args[2], args[3], args[4], args[5], args[6] if len(args)>6 else None)
elif cmd == 'state':
cmd_state(args[1], args[2], args[3] if len(args)>3 else None)
elif cmd == 'flow':
cmd_flow(args[1], args[2], args[3], args[4])
elif cmd == 'done':
cmd_done(args[1], args[2] if len(args)>2 else '', args[3] if len(args)>3 else '')
elif cmd == 'block':
cmd_block(args[1], args[2])
elif cmd == 'todo':
# 解析可选 --detail 参数
todo_pos = []
todo_detail = ''
ti = 1
while ti < len(args):
if args[ti] == '--detail' and ti + 1 < len(args):
todo_detail = args[ti + 1]; ti += 2
else:
todo_pos.append(args[ti]); ti += 1
cmd_todo(
todo_pos[0] if len(todo_pos) > 0 else '',
todo_pos[1] if len(todo_pos) > 1 else '',
todo_pos[2] if len(todo_pos) > 2 else '',
todo_pos[3] if len(todo_pos) > 3 else 'not-started',
detail=todo_detail,
)
elif cmd == 'progress':
# 解析可选 --tokens/--cost/--elapsed 参数
pos_args = []
kw = {}
i = 1
while i < len(args):
if args[i] == '--tokens' and i + 1 < len(args):
kw['tokens'] = args[i + 1]; i += 2
elif args[i] == '--cost' and i + 1 < len(args):
kw['cost'] = args[i + 1]; i += 2
elif args[i] == '--elapsed' and i + 1 < len(args):
kw['elapsed'] = args[i + 1]; i += 2
else:
pos_args.append(args[i]); i += 1
cmd_progress(
pos_args[0] if len(pos_args) > 0 else '',
pos_args[1] if len(pos_args) > 1 else '',
pos_args[2] if len(pos_args) > 2 else '',
tokens=kw.get('tokens', 0),
cost=kw.get('cost', 0.0),
elapsed=kw.get('elapsed', 0),
)
else:
print(__doc__)
sys.exit(1)