Files
edict/scripts/refresh_live_data.py
cft0808 b8d06a3972 feat: 完成全部剩余功能修复 (P0-P3)
P0:
- 圣旨模板下旨真正创建任务: 新增 POST /api/create-task
  前端 executeTemplate 改为 API 调用(降级仍可剪贴板复制)

P1:
- morning-config POST 字段校验: 检查 categories/keywords/feishu_webhook 类型
- 早报幂等锁支持 --force 强制采集: 看板手动刷新默认 force=true
- sync_agent_config 补全 Copilot 模型列表(6个)

P2:
- utils.py 公共函数抽取: read_json/now_iso/validate_url/safe_name
- refresh_live_data.py 改用 utils.read_json 消除重复定义
- apply_model_changes 回滚标记: 失败时 rolledBack=true 写入日志+前端展示
- 早报日期 API 兼容 YYYY-MM-DD 自动转换 + 格式校验
- Request logging: log_message 改为只记录 4xx/5xx 错误请求
- 飞书 Webhook URL 校验: 限制 https + open.feishu.cn 域名

P3:
- 御批模式基础实现: Review/Menxia 状态显示准奏/封驳按钮
  新增 POST /api/review-action(approve推进/reject退回中书省+轮次+1)
  前端 reviewAction() + 变更日志回滚标记显示
2026-02-26 21:42:13 +08:00

123 lines
5.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import json, pathlib, datetime, logging
from file_lock import atomic_json_write, atomic_json_read
from utils import read_json
log = logging.getLogger('refresh')
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(message)s', datefmt='%H:%M:%S')
BASE = pathlib.Path(__file__).parent.parent
DATA = BASE / 'data'
def output_meta(path):
p = pathlib.Path(path)
if not p.exists():
return {"exists": False, "lastModified": None}
ts = datetime.datetime.fromtimestamp(p.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')
return {"exists": True, "lastModified": ts}
def main():
# 使用 officials_stats.json与 sync_officials_stats.py 统一)
officials_data = read_json(DATA / 'officials_stats.json', {})
officials = officials_data.get('officials', []) if isinstance(officials_data, dict) else officials_data
# 任务源优先tasks_source.json可对接外部系统同步写入
tasks = atomic_json_read(DATA / 'tasks_source.json', [])
if not tasks:
tasks = read_json(DATA / 'tasks.json', [])
sync_status = read_json(DATA / 'sync_status.json', {})
org_map = {}
for o in officials:
label = o.get('label', o.get('name', ''))
if label:
org_map[label] = label
now_ts = datetime.datetime.now(datetime.timezone.utc)
for t in tasks:
t['org'] = t.get('org') or org_map.get(t.get('official', ''), '')
t['outputMeta'] = output_meta(t.get('output', ''))
# 心跳时效检测:对 Doing/Assigned 状态的任务标注活跃度
if t.get('state') in ('Doing', 'Assigned', 'Review'):
updated_raw = t.get('updatedAt') or t.get('sourceMeta', {}).get('updatedAt')
age_sec = None
if updated_raw:
try:
if isinstance(updated_raw, (int, float)):
updated_dt = datetime.datetime.fromtimestamp(updated_raw / 1000, tz=datetime.timezone.utc)
else:
updated_dt = datetime.datetime.fromisoformat(str(updated_raw).replace('Z', '+00:00'))
age_sec = (now_ts - updated_dt).total_seconds()
except Exception:
pass
if age_sec is None:
t['heartbeat'] = {'status': 'unknown', 'label': '⚪ 未知', 'ageSec': None}
elif age_sec < 180:
t['heartbeat'] = {'status': 'active', 'label': f'🟢 活跃 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)}
elif age_sec < 600:
t['heartbeat'] = {'status': 'warn', 'label': f'🟡 可能停滞 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)}
else:
t['heartbeat'] = {'status': 'stalled', 'label': f'🔴 已停滞 {int(age_sec//60)}分钟', 'ageSec': int(age_sec)}
else:
t['heartbeat'] = None
today_str = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d')
def _is_today_done(t):
if t.get('state') != 'Done':
return False
ua = t.get('updatedAt', '')
if isinstance(ua, str) and ua[:10] == today_str:
return True
# fallback: outputMeta lastModified
lm = t.get('outputMeta', {}).get('lastModified', '')
if isinstance(lm, str) and lm[:10] == today_str:
return True
return False
today_done = sum(1 for t in tasks if _is_today_done(t))
total_done = sum(1 for t in tasks if t.get('state') == 'Done')
in_progress = sum(1 for t in tasks if t.get('state') in ['Doing', 'Review', 'Next', 'Blocked'])
blocked = sum(1 for t in tasks if t.get('state') == 'Blocked')
history = []
for t in tasks:
if t.get('state') == 'Done':
lm = t.get('outputMeta', {}).get('lastModified')
history.append({
'at': lm or '未知',
'official': t.get('official'),
'task': t.get('title'),
'out': t.get('output'),
'qa': '通过' if t.get('outputMeta', {}).get('exists') else '待补成果'
})
payload = {
'generatedAt': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'taskSource': 'tasks_source.json' if (DATA / 'tasks_source.json').exists() else 'tasks.json',
'officials': officials,
'tasks': tasks,
'history': history,
'metrics': {
'officialCount': len(officials),
'todayDone': today_done,
'totalDone': total_done,
'inProgress': in_progress,
'blocked': blocked
},
'syncStatus': sync_status,
'health': {
'syncOk': bool(sync_status.get('ok', False)),
'syncLatencyMs': sync_status.get('durationMs'),
'missingFieldCount': len(sync_status.get('missingFields', {})),
}
}
atomic_json_write(DATA / 'live_status.json', payload)
log.info(f'updated live_status.json ({len(tasks)} tasks)')
if __name__ == '__main__':
main()