Files
edict/scripts/file_lock.py
狼哥 c958d06cb4 fix(flow): prevent premature task completion before review (#280)
cmd_done() 不再直接写 Done,改为校验 todos 完成度后路由到 Review;dashboard 准奏也增加 todo 完成度门控,防止子任务未完成就关闭任务
2026-04-20 00:17:01 +08:00

140 lines
4.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
文件锁工具 — 防止多进程并发读写 JSON 文件导致数据丢失。
用法:
from file_lock import atomic_json_update, atomic_json_read
# 原子读取
data = atomic_json_read(path, default=[])
# 原子更新(读 → 修改 → 写回,全程持锁)
def modifier(tasks):
tasks.append(new_task)
return tasks
atomic_json_update(path, modifier, default=[])
"""
import json
import os
import pathlib
import tempfile
from typing import Any, Callable
_IS_WINDOWS = os.name == 'nt'
if _IS_WINDOWS:
import msvcrt
else:
import fcntl
# ── 平台抽象:文件锁 ────────────────────────────────────────────
def _lock_shared(fd: int) -> None:
"""获取共享锁(读锁)。"""
if _IS_WINDOWS:
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
else:
fcntl.flock(fd, fcntl.LOCK_SH)
def _lock_exclusive(fd: int) -> None:
"""获取排他锁(写锁)。"""
if _IS_WINDOWS:
msvcrt.locking(fd, msvcrt.LK_LOCK, 1)
else:
fcntl.flock(fd, fcntl.LOCK_EX)
def _unlock(fd: int) -> None:
"""释放锁。"""
if _IS_WINDOWS:
try:
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
except OSError:
pass
else:
fcntl.flock(fd, fcntl.LOCK_UN)
def _lock_path(path: pathlib.Path) -> pathlib.Path:
return path.parent / (path.name + '.lock')
def atomic_json_read(path: pathlib.Path, default: Any = None) -> Any:
"""持锁读取 JSON 文件。"""
lock_file = _lock_path(path)
lock_file.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
try:
_lock_shared(fd)
try:
return json.loads(path.read_text(encoding='utf-8')) if path.exists() else default
except Exception:
return default
finally:
_unlock(fd)
os.close(fd)
def atomic_json_update(
path: pathlib.Path,
modifier: Callable[[Any], Any],
default: Any = None,
) -> Any:
"""
原子地读取 → 修改 → 写回 JSON 文件。
modifier(data) 应返回修改后的数据。
使用临时文件 + rename 保证写入原子性。
"""
lock_file = _lock_path(path)
lock_file.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
try:
_lock_exclusive(fd)
# Read
try:
data = json.loads(path.read_text(encoding='utf-8')) if path.exists() else default
except Exception:
data = default
# Modify
result = modifier(data)
# Atomic write via temp file + rename
tmp_fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_'
)
try:
with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, str(path))
except Exception:
os.unlink(tmp_path)
raise
return result
finally:
_unlock(fd)
os.close(fd)
def atomic_json_write(path: pathlib.Path, data: Any) -> None:
"""原子写入 JSON 文件(持排他锁 + tmpfile rename
直接写入,不读取现有内容(避免 atomic_json_update 的多余读开销)。
"""
lock_file = _lock_path(path)
lock_file.parent.mkdir(parents=True, exist_ok=True)
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR)
try:
_lock_exclusive(fd)
tmp_fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), suffix='.tmp', prefix=path.stem + '_'
)
try:
with os.fdopen(tmp_fd, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, str(path))
except Exception:
os.unlink(tmp_path)
raise
finally:
_unlock(fd)
os.close(fd)