mirror of
https://mirror.skon.top/github.com/code-yeongyu/oh-my-opencode
synced 2026-04-30 18:50:29 +08:00
feat(team-mode): add tasklist (flock claim, individual JSON files, D-08/09, §III.6-8)
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
11
.sisyphus/evidence/team-mode/task-11-blocked.txt
Normal file
11
.sisyphus/evidence/team-mode/task-11-blocked.txt
Normal file
@@ -0,0 +1,11 @@
|
||||
Task 11 evidence: blockedBy enforcement (C-5.6 / C-5.7)
|
||||
|
||||
- Test file: src/features/team-mode/team-tasklist/claim.test.ts
|
||||
- Test name: claimTask rejects blocked tasks until blockers complete
|
||||
- Result: PASS
|
||||
- Verified: blocked task claim throws BlockedByError while blocker is unresolved
|
||||
- Verified: after blocker transitions to completed, blocked task claim succeeds
|
||||
|
||||
Suite snapshot:
|
||||
- bun test src/features/team-mode/team-tasklist/
|
||||
- 13 pass, 0 fail, 26 expect() calls
|
||||
11
.sisyphus/evidence/team-mode/task-11-claim-arb.txt
Normal file
11
.sisyphus/evidence/team-mode/task-11-claim-arb.txt
Normal file
@@ -0,0 +1,11 @@
|
||||
Task 11 evidence: Concurrent claim arbitration (C-5.3 / F-02)
|
||||
|
||||
- Test file: src/features/team-mode/team-tasklist/claim.test.ts
|
||||
- Test name: claimTask allows exactly one concurrent claimant
|
||||
- Result: PASS
|
||||
- Verified: one claimant fulfilled
|
||||
- Verified: one claimant failed with AlreadyClaimedError
|
||||
|
||||
Suite snapshot:
|
||||
- bun test src/features/team-mode/team-tasklist/
|
||||
- 13 pass, 0 fail, 26 expect() calls
|
||||
11
.sisyphus/evidence/team-mode/task-11-id-counter.txt
Normal file
11
.sisyphus/evidence/team-mode/task-11-id-counter.txt
Normal file
@@ -0,0 +1,11 @@
|
||||
Task 11 evidence: Atomic ID counter (C-5.2)
|
||||
|
||||
- Test file: src/features/team-mode/team-tasklist/store.test.ts
|
||||
- Test name: createTask assigns distinct ids during concurrent creation
|
||||
- Result: PASS
|
||||
- Verified: two concurrent createTask calls returned task IDs "1" and "2"
|
||||
- Verified: tasks/.highwatermark ended at 2
|
||||
|
||||
Suite snapshot:
|
||||
- bun test src/features/team-mode/team-tasklist/
|
||||
- 13 pass, 0 fail, 26 expect() calls
|
||||
11
.sisyphus/evidence/team-mode/task-11-reverse.txt
Normal file
11
.sisyphus/evidence/team-mode/task-11-reverse.txt
Normal file
@@ -0,0 +1,11 @@
|
||||
Task 11 evidence: Reverse transition rejected (C-5.5)
|
||||
|
||||
- Test file: src/features/team-mode/team-tasklist/update.test.ts
|
||||
- Test name: updateTaskStatus rejects reverse transitions
|
||||
- Result: PASS
|
||||
- Verified: completed -> claimed throws InvalidTaskTransitionError
|
||||
- Verified message: no reverse transitions from completed to claimed
|
||||
|
||||
Suite snapshot:
|
||||
- bun test src/features/team-mode/team-tasklist/
|
||||
- 13 pass, 0 fail, 26 expect() calls
|
||||
11
.sisyphus/evidence/team-mode/task-11-stale-reap.txt
Normal file
11
.sisyphus/evidence/team-mode/task-11-stale-reap.txt
Normal file
@@ -0,0 +1,11 @@
|
||||
Task 11 evidence: Stale claim lock reap (C-5.8 / F-13)
|
||||
|
||||
- Test file: src/features/team-mode/team-tasklist/claim.test.ts
|
||||
- Test name: claimTask reaps a stale claim lock before claiming
|
||||
- Result: PASS
|
||||
- Verified: pre-created claims/<id>.lock with dead PID and 10 minute old timestamp was reaped
|
||||
- Verified: claimTask then acquired the task successfully
|
||||
|
||||
Suite snapshot:
|
||||
- bun test src/features/team-mode/team-tasklist/
|
||||
- 13 pass, 0 fail, 26 expect() calls
|
||||
100
src/features/team-mode/team-tasklist/claim.test.ts
Normal file
100
src/features/team-mode/team-tasklist/claim.test.ts
Normal file
@@ -0,0 +1,100 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { expect, test } from "bun:test"
|
||||
import { mkdir, writeFile } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { claimTask, AlreadyClaimedError, BlockedByError } from "./claim"
|
||||
import { createTask } from "./store"
|
||||
import { createTaskInput, createTasklistFixture } from "./test-support"
|
||||
import { updateTaskStatus } from "./update"
|
||||
|
||||
test("claimTask allows exactly one concurrent claimant", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const task = await createTask(fixture.teamRunId, createTaskInput(), fixture.config)
|
||||
|
||||
// when
|
||||
const claimResults = await Promise.allSettled([
|
||||
claimTask(fixture.teamRunId, task.id, "member-a", fixture.config),
|
||||
claimTask(fixture.teamRunId, task.id, "member-b", fixture.config),
|
||||
])
|
||||
|
||||
const successfulClaims = claimResults.filter((result) => result.status === "fulfilled")
|
||||
const failedClaims = claimResults.filter((result) => result.status === "rejected")
|
||||
|
||||
// then
|
||||
expect(successfulClaims).toHaveLength(1)
|
||||
expect(failedClaims).toHaveLength(1)
|
||||
expect(failedClaims[0]?.status).toBe("rejected")
|
||||
if (failedClaims[0]?.status === "rejected") {
|
||||
expect(failedClaims[0].reason).toBeInstanceOf(AlreadyClaimedError)
|
||||
}
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
test("claimTask rejects blocked tasks until blockers complete", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const blockerTask = await createTask(fixture.teamRunId, createTaskInput({ subject: "blocker" }), fixture.config)
|
||||
const blockedTask = await createTask(
|
||||
fixture.teamRunId,
|
||||
createTaskInput({ subject: "blocked", blockedBy: [blockerTask.id] }),
|
||||
fixture.config,
|
||||
)
|
||||
|
||||
// when
|
||||
let blockedError: unknown = null
|
||||
try {
|
||||
await claimTask(fixture.teamRunId, blockedTask.id, "member-a", fixture.config)
|
||||
} catch (error) {
|
||||
blockedError = error
|
||||
}
|
||||
|
||||
// then
|
||||
expect(blockedError).toBeInstanceOf(BlockedByError)
|
||||
|
||||
// given
|
||||
await claimTask(fixture.teamRunId, blockerTask.id, "member-b", fixture.config)
|
||||
await updateTaskStatus(fixture.teamRunId, blockerTask.id, "in_progress", "member-b", fixture.config)
|
||||
await updateTaskStatus(fixture.teamRunId, blockerTask.id, "completed", "member-b", fixture.config)
|
||||
|
||||
// when
|
||||
const claimedTask = await claimTask(fixture.teamRunId, blockedTask.id, "member-a", fixture.config)
|
||||
|
||||
// then
|
||||
expect(claimedTask.status).toBe("claimed")
|
||||
expect(claimedTask.owner).toBe("member-a")
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
test("claimTask reaps a stale claim lock before claiming", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const task = await createTask(fixture.teamRunId, createTaskInput(), fixture.config)
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(fixture.config), fixture.teamRunId)
|
||||
const staleLockPath = path.join(tasksDirectory, "claims", `${task.id}.lock`)
|
||||
await mkdir(staleLockPath, { recursive: true })
|
||||
await writeFile(path.join(staleLockPath, "owner"), `member-z\n999999\n${Date.now() - 600_000}`)
|
||||
|
||||
// when
|
||||
const claimedTask = await claimTask(fixture.teamRunId, task.id, "member-a", fixture.config)
|
||||
|
||||
// then
|
||||
expect(claimedTask.status).toBe("claimed")
|
||||
expect(claimedTask.owner).toBe("member-a")
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
98
src/features/team-mode/team-tasklist/claim.ts
Normal file
98
src/features/team-mode/team-tasklist/claim.ts
Normal file
@@ -0,0 +1,98 @@
|
||||
import { access, mkdir } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import type { TeamModeConfig } from "../../../config/schema/team-mode"
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { atomicWrite, detectStaleLock, reapStaleLock, withLock } from "../team-state-store/locks"
|
||||
import { TaskSchema } from "../types"
|
||||
import type { Task } from "../types"
|
||||
import { canClaim } from "./dependencies"
|
||||
import { getTask } from "./get"
|
||||
import { listTasks } from "./list"
|
||||
|
||||
const CLAIM_STALE_AFTER_MS = 300_000
|
||||
|
||||
async function lockExists(lockPath: string): Promise<boolean> {
|
||||
try {
|
||||
await access(lockPath)
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
function getBlockingTaskIds(task: Task, allTasks: Task[]): string[] {
|
||||
return task.blockedBy.filter((blockerId) => {
|
||||
const blockerTask = allTasks.find((candidateTask) => candidateTask.id === blockerId)
|
||||
return blockerTask !== undefined && blockerTask.status !== "completed"
|
||||
})
|
||||
}
|
||||
|
||||
export class AlreadyClaimedError extends Error {
|
||||
constructor(message = "already_claimed") {
|
||||
super(message)
|
||||
this.name = "AlreadyClaimedError"
|
||||
}
|
||||
}
|
||||
|
||||
export class BlockedByError extends Error {
|
||||
constructor(public readonly blockers: string[]) {
|
||||
super(`blocked by ${blockers.join(",")}`)
|
||||
this.name = "BlockedByError"
|
||||
}
|
||||
}
|
||||
|
||||
export async function claimTask(
|
||||
teamRunId: string,
|
||||
taskId: string,
|
||||
memberName: string,
|
||||
config: TeamModeConfig,
|
||||
): Promise<Task> {
|
||||
const baseDirectory = resolveBaseDir(config)
|
||||
const tasksDirectory = getTasksDir(baseDirectory, teamRunId)
|
||||
const claimsDirectory = path.join(tasksDirectory, "claims")
|
||||
const taskPath = path.join(tasksDirectory, `${taskId}.json`)
|
||||
const claimLockPath = path.join(claimsDirectory, `${taskId}.lock`)
|
||||
|
||||
await mkdir(claimsDirectory, { recursive: true, mode: 0o700 })
|
||||
|
||||
const task = await getTask(teamRunId, taskId, config)
|
||||
if (task.status !== "pending") {
|
||||
throw new AlreadyClaimedError()
|
||||
}
|
||||
|
||||
const allTasks = await listTasks(teamRunId, config)
|
||||
if (!canClaim(task, allTasks)) {
|
||||
throw new BlockedByError(getBlockingTaskIds(task, allTasks))
|
||||
}
|
||||
|
||||
if (await detectStaleLock(claimLockPath, CLAIM_STALE_AFTER_MS)) {
|
||||
await reapStaleLock(claimLockPath)
|
||||
} else if (await lockExists(claimLockPath)) {
|
||||
throw new AlreadyClaimedError()
|
||||
}
|
||||
|
||||
return await withLock(claimLockPath, async () => {
|
||||
const refreshedTask = await getTask(teamRunId, taskId, config)
|
||||
if (refreshedTask.status !== "pending") {
|
||||
throw new AlreadyClaimedError()
|
||||
}
|
||||
|
||||
const refreshedTasks = await listTasks(teamRunId, config)
|
||||
if (!canClaim(refreshedTask, refreshedTasks)) {
|
||||
throw new BlockedByError(getBlockingTaskIds(refreshedTask, refreshedTasks))
|
||||
}
|
||||
|
||||
const now = Date.now()
|
||||
const updatedTask = TaskSchema.parse({
|
||||
...refreshedTask,
|
||||
status: "claimed",
|
||||
owner: memberName,
|
||||
claimedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
|
||||
await atomicWrite(taskPath, `${JSON.stringify(updatedTask, null, 2)}\n`)
|
||||
return updatedTask
|
||||
}, { ownerTag: memberName, staleAfterMs: CLAIM_STALE_AFTER_MS })
|
||||
}
|
||||
47
src/features/team-mode/team-tasklist/dependencies.test.ts
Normal file
47
src/features/team-mode/team-tasklist/dependencies.test.ts
Normal file
@@ -0,0 +1,47 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { describe, expect, test } from "bun:test"
|
||||
|
||||
import type { Task } from "../types"
|
||||
import { canClaim } from "./dependencies"
|
||||
|
||||
function buildTask(id: string, status: Task["status"], blockedBy: string[] = []): Task {
|
||||
const now = Date.now()
|
||||
return {
|
||||
version: 1,
|
||||
id,
|
||||
subject: `subject-${id}`,
|
||||
description: `description-${id}`,
|
||||
status,
|
||||
blocks: [],
|
||||
blockedBy,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
}
|
||||
}
|
||||
|
||||
describe("canClaim", () => {
|
||||
test("returns false when a blocker is not completed", () => {
|
||||
// given
|
||||
const blockerTask = buildTask("2", "in_progress")
|
||||
const dependentTask = buildTask("1", "pending", ["2"])
|
||||
|
||||
// when
|
||||
const claimable = canClaim(dependentTask, [dependentTask, blockerTask])
|
||||
|
||||
// then
|
||||
expect(claimable).toBe(false)
|
||||
})
|
||||
|
||||
test("ignores missing blockers and completed blockers", () => {
|
||||
// given
|
||||
const completedBlockerTask = buildTask("2", "completed")
|
||||
const dependentTask = buildTask("1", "pending", ["2", "999"])
|
||||
|
||||
// when
|
||||
const claimable = canClaim(dependentTask, [dependentTask, completedBlockerTask])
|
||||
|
||||
// then
|
||||
expect(claimable).toBe(true)
|
||||
})
|
||||
})
|
||||
8
src/features/team-mode/team-tasklist/dependencies.ts
Normal file
8
src/features/team-mode/team-tasklist/dependencies.ts
Normal file
@@ -0,0 +1,8 @@
|
||||
import type { Task } from "../types"
|
||||
|
||||
export function canClaim(task: Task, allTasks: Task[]): boolean {
|
||||
return task.blockedBy.every((blockerId) => {
|
||||
const blockerTask = allTasks.find((candidateTask) => candidateTask.id === blockerId)
|
||||
return blockerTask === undefined || blockerTask.status === "completed"
|
||||
})
|
||||
}
|
||||
45
src/features/team-mode/team-tasklist/get.test.ts
Normal file
45
src/features/team-mode/team-tasklist/get.test.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { expect, test } from "bun:test"
|
||||
|
||||
import { createTask } from "./store"
|
||||
import { createTaskInput, createTasklistFixture } from "./test-support"
|
||||
import { getTask } from "./get"
|
||||
|
||||
test("getTask returns a persisted task", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const createdTask = await createTask(fixture.teamRunId, createTaskInput({ subject: "persisted task" }), fixture.config)
|
||||
|
||||
// when
|
||||
const loadedTask = await getTask(fixture.teamRunId, createdTask.id, fixture.config)
|
||||
|
||||
// then
|
||||
expect(loadedTask).toEqual(createdTask)
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
test("getTask throws when the task file is missing", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
// when
|
||||
let thrownError: unknown = null
|
||||
|
||||
try {
|
||||
await getTask(fixture.teamRunId, "999", fixture.config)
|
||||
} catch (error) {
|
||||
thrownError = error
|
||||
}
|
||||
|
||||
// then
|
||||
expect(thrownError).toBeInstanceOf(Error)
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
13
src/features/team-mode/team-tasklist/get.ts
Normal file
13
src/features/team-mode/team-tasklist/get.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { readFile } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import type { TeamModeConfig } from "../../../config/schema/team-mode"
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { TaskSchema } from "../types"
|
||||
import type { Task } from "../types"
|
||||
|
||||
export async function getTask(teamRunId: string, taskId: string, config: TeamModeConfig): Promise<Task> {
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(config), teamRunId)
|
||||
const taskContent = await readFile(path.join(tasksDirectory, `${taskId}.json`), "utf8")
|
||||
return TaskSchema.parse(JSON.parse(taskContent))
|
||||
}
|
||||
6
src/features/team-mode/team-tasklist/index.ts
Normal file
6
src/features/team-mode/team-tasklist/index.ts
Normal file
@@ -0,0 +1,6 @@
|
||||
export { claimTask, AlreadyClaimedError, BlockedByError } from "./claim"
|
||||
export { canClaim } from "./dependencies"
|
||||
export { getTask } from "./get"
|
||||
export { listTasks } from "./list"
|
||||
export { createTask } from "./store"
|
||||
export { updateTaskStatus, CrossOwnerUpdateError, InvalidTaskTransitionError } from "./update"
|
||||
63
src/features/team-mode/team-tasklist/list.test.ts
Normal file
63
src/features/team-mode/team-tasklist/list.test.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { expect, test } from "bun:test"
|
||||
import { writeFile } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { createTask } from "./store"
|
||||
import { createTaskInput, createTasklistFixture } from "./test-support"
|
||||
import { updateTaskStatus } from "./update"
|
||||
import { listTasks } from "./list"
|
||||
|
||||
test("listTasks returns tasks sorted ascending and honors filters", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const firstTask = await createTask(
|
||||
fixture.teamRunId,
|
||||
createTaskInput({ subject: "one", status: "claimed", owner: "member-a", claimedAt: Date.now() }),
|
||||
fixture.config,
|
||||
)
|
||||
await createTask(fixture.teamRunId, createTaskInput({ subject: "two" }), fixture.config)
|
||||
const thirdTask = await createTask(
|
||||
fixture.teamRunId,
|
||||
createTaskInput({ subject: "three", status: "claimed", owner: "member-a", claimedAt: Date.now() }),
|
||||
fixture.config,
|
||||
)
|
||||
await updateTaskStatus(fixture.teamRunId, thirdTask.id, "in_progress", "member-a", fixture.config)
|
||||
|
||||
// when
|
||||
const allTasks = await listTasks(fixture.teamRunId, fixture.config)
|
||||
const claimedTasks = await listTasks(fixture.teamRunId, fixture.config, { status: "claimed", owner: "member-a" })
|
||||
|
||||
// then
|
||||
expect(allTasks.map((task) => task.id)).toEqual([firstTask.id, "2", thirdTask.id])
|
||||
expect(claimedTasks).toHaveLength(1)
|
||||
expect(claimedTasks[0]?.id).toBe(firstTask.id)
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
test("listTasks skips malformed task files", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const validTask = await createTask(fixture.teamRunId, createTaskInput(), fixture.config)
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(fixture.config), fixture.teamRunId)
|
||||
await writeFile(path.join(tasksDirectory, "bad.json"), "{not-json")
|
||||
await writeFile(path.join(tasksDirectory, ".highwatermark"), "1")
|
||||
|
||||
// when
|
||||
const listedTasks = await listTasks(fixture.teamRunId, fixture.config)
|
||||
|
||||
// then
|
||||
expect(listedTasks).toHaveLength(1)
|
||||
expect(listedTasks[0]?.id).toBe(validTask.id)
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
62
src/features/team-mode/team-tasklist/list.ts
Normal file
62
src/features/team-mode/team-tasklist/list.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import type { Dirent } from "node:fs"
|
||||
import { readdir, readFile } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import type { TeamModeConfig } from "../../../config/schema/team-mode"
|
||||
import { log } from "../../../shared/logger"
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { TaskSchema } from "../types"
|
||||
import type { Task } from "../types"
|
||||
|
||||
type TaskListFilter = {
|
||||
status?: Task["status"]
|
||||
owner?: string
|
||||
}
|
||||
|
||||
export async function listTasks(
|
||||
teamRunId: string,
|
||||
config: TeamModeConfig,
|
||||
filter?: TaskListFilter,
|
||||
): Promise<Task[]> {
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(config), teamRunId)
|
||||
|
||||
let entries: Dirent[]
|
||||
try {
|
||||
entries = await readdir(tasksDirectory, { withFileTypes: true })
|
||||
} catch {
|
||||
return []
|
||||
}
|
||||
|
||||
const parsedTasks: Task[] = []
|
||||
for (const entry of entries) {
|
||||
if (entry.isDirectory()) continue
|
||||
if (entry.name.startsWith(".")) continue
|
||||
if (!entry.name.endsWith(".json")) continue
|
||||
|
||||
const taskPath = path.join(tasksDirectory, entry.name)
|
||||
try {
|
||||
const taskContent = await readFile(taskPath, "utf8")
|
||||
const parsedTask = TaskSchema.safeParse(JSON.parse(taskContent))
|
||||
if (!parsedTask.success) {
|
||||
log("team-tasklist skipped malformed task", {
|
||||
event: "team-tasklist-malformed-task",
|
||||
taskPath,
|
||||
issues: parsedTask.error.issues,
|
||||
})
|
||||
continue
|
||||
}
|
||||
parsedTasks.push(parsedTask.data)
|
||||
} catch (error) {
|
||||
log("team-tasklist skipped malformed task", {
|
||||
event: "team-tasklist-malformed-task",
|
||||
taskPath,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return parsedTasks
|
||||
.filter((task) => filter?.status === undefined || task.status === filter.status)
|
||||
.filter((task) => filter?.owner === undefined || task.owner === filter.owner)
|
||||
.sort((leftTask, rightTask) => Number.parseInt(leftTask.id, 10) - Number.parseInt(rightTask.id, 10))
|
||||
}
|
||||
32
src/features/team-mode/team-tasklist/store.test.ts
Normal file
32
src/features/team-mode/team-tasklist/store.test.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { expect, test } from "bun:test"
|
||||
import { readFile } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { createTask } from "./store"
|
||||
import { createTaskInput, createTasklistFixture } from "./test-support"
|
||||
|
||||
test("createTask assigns distinct ids during concurrent creation", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
// when
|
||||
const [firstTask, secondTask] = await Promise.all([
|
||||
createTask(fixture.teamRunId, createTaskInput({ subject: "first task" }), fixture.config),
|
||||
createTask(fixture.teamRunId, createTaskInput({ subject: "second task" }), fixture.config),
|
||||
])
|
||||
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(fixture.config), fixture.teamRunId)
|
||||
const watermarkContent = await readFile(path.join(tasksDirectory, ".highwatermark"), "utf8")
|
||||
const sortedIds = [firstTask.id, secondTask.id].sort((leftId, rightId) => Number(leftId) - Number(rightId))
|
||||
|
||||
// then
|
||||
expect(sortedIds).toEqual(["1", "2"])
|
||||
expect(watermarkContent.trim()).toBe("2")
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
53
src/features/team-mode/team-tasklist/store.ts
Normal file
53
src/features/team-mode/team-tasklist/store.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { mkdir, readFile } from "node:fs/promises"
|
||||
import path from "node:path"
|
||||
|
||||
import type { TeamModeConfig } from "../../../config/schema/team-mode"
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { atomicWrite, withLock } from "../team-state-store/locks"
|
||||
import { TaskSchema } from "../types"
|
||||
import type { Task } from "../types"
|
||||
|
||||
const HIGH_WATERMARK_FILE = ".highwatermark"
|
||||
|
||||
async function readHighWatermark(watermarkPath: string): Promise<number> {
|
||||
try {
|
||||
const watermarkContent = (await readFile(watermarkPath, "utf8")).trim()
|
||||
const parsedWatermark = Number.parseInt(watermarkContent, 10)
|
||||
return Number.isInteger(parsedWatermark) && parsedWatermark >= 0 ? parsedWatermark : 0
|
||||
} catch {
|
||||
await atomicWrite(watermarkPath, "0")
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
export async function createTask(
|
||||
teamRunId: string,
|
||||
taskInput: Omit<Task, "id" | "createdAt" | "updatedAt" | "version">,
|
||||
config: TeamModeConfig,
|
||||
): Promise<Task> {
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(config), teamRunId)
|
||||
await mkdir(tasksDirectory, { recursive: true, mode: 0o700 })
|
||||
await mkdir(path.join(tasksDirectory, "claims"), { recursive: true, mode: 0o700 })
|
||||
|
||||
return await withLock(path.join(tasksDirectory, ".lock"), async () => {
|
||||
const watermarkPath = path.join(tasksDirectory, HIGH_WATERMARK_FILE)
|
||||
const nextTaskId = (await readHighWatermark(watermarkPath)) + 1
|
||||
await atomicWrite(watermarkPath, String(nextTaskId))
|
||||
|
||||
const now = Date.now()
|
||||
const task = TaskSchema.parse({
|
||||
...taskInput,
|
||||
version: 1,
|
||||
id: String(nextTaskId),
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
|
||||
await atomicWrite(
|
||||
path.join(tasksDirectory, `${task.id}.json`),
|
||||
`${JSON.stringify(task, null, 2)}\n`,
|
||||
)
|
||||
|
||||
return task
|
||||
}, { ownerTag: `create-task:${teamRunId}` })
|
||||
}
|
||||
46
src/features/team-mode/team-tasklist/test-support.ts
Normal file
46
src/features/team-mode/team-tasklist/test-support.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import { mkdtemp, mkdir, rm } from "node:fs/promises"
|
||||
import { tmpdir } from "node:os"
|
||||
import path from "node:path"
|
||||
import { randomUUID } from "node:crypto"
|
||||
|
||||
import { TeamModeConfigSchema } from "../../../config/schema/team-mode"
|
||||
import type { TeamModeConfig } from "../../../config/schema/team-mode"
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import type { Task } from "../types"
|
||||
|
||||
export async function createTasklistFixture(): Promise<{
|
||||
config: TeamModeConfig
|
||||
rootDirectory: string
|
||||
teamRunId: string
|
||||
cleanup: () => Promise<void>
|
||||
}> {
|
||||
const rootDirectory = await mkdtemp(path.join(tmpdir(), "team-tasklist-"))
|
||||
const config = TeamModeConfigSchema.parse({ base_dir: rootDirectory, enabled: true })
|
||||
const teamRunId = randomUUID()
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(config), teamRunId)
|
||||
|
||||
await mkdir(path.join(tasksDirectory, "claims"), { recursive: true, mode: 0o700 })
|
||||
|
||||
return {
|
||||
config,
|
||||
rootDirectory,
|
||||
teamRunId,
|
||||
cleanup: async () => {
|
||||
await rm(rootDirectory, { recursive: true, force: true })
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
export function createTaskInput(overrides?: Partial<Omit<Task, "id" | "createdAt" | "updatedAt" | "version">>): Omit<Task, "id" | "createdAt" | "updatedAt" | "version"> {
|
||||
return {
|
||||
subject: overrides?.subject ?? "task subject",
|
||||
description: overrides?.description ?? "task description",
|
||||
activeForm: overrides?.activeForm,
|
||||
status: overrides?.status ?? "pending",
|
||||
owner: overrides?.owner,
|
||||
blocks: overrides?.blocks ?? [],
|
||||
blockedBy: overrides?.blockedBy ?? [],
|
||||
metadata: overrides?.metadata,
|
||||
claimedAt: overrides?.claimedAt,
|
||||
}
|
||||
}
|
||||
89
src/features/team-mode/team-tasklist/update.test.ts
Normal file
89
src/features/team-mode/team-tasklist/update.test.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
/// <reference types="bun-types" />
|
||||
|
||||
import { expect, test } from "bun:test"
|
||||
|
||||
import { claimTask } from "./claim"
|
||||
import { getTask } from "./get"
|
||||
import { createTask } from "./store"
|
||||
import { createTaskInput, createTasklistFixture } from "./test-support"
|
||||
import { CrossOwnerUpdateError, InvalidTaskTransitionError, updateTaskStatus } from "./update"
|
||||
|
||||
test("updateTaskStatus supports the one-way claim to complete flow", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const task = await createTask(fixture.teamRunId, createTaskInput(), fixture.config)
|
||||
await claimTask(fixture.teamRunId, task.id, "member-a", fixture.config)
|
||||
|
||||
// when
|
||||
await updateTaskStatus(fixture.teamRunId, task.id, "in_progress", "member-a", fixture.config)
|
||||
const completedTask = await updateTaskStatus(fixture.teamRunId, task.id, "completed", "member-a", fixture.config)
|
||||
const loadedTask = await getTask(fixture.teamRunId, task.id, fixture.config)
|
||||
|
||||
// then
|
||||
expect(completedTask.status).toBe("completed")
|
||||
expect(loadedTask.status).toBe("completed")
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
test("updateTaskStatus rejects reverse transitions", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const task = await createTask(
|
||||
fixture.teamRunId,
|
||||
createTaskInput({ status: "completed", owner: "member-a", claimedAt: Date.now() }),
|
||||
fixture.config,
|
||||
)
|
||||
|
||||
// when
|
||||
let thrownError: unknown = null
|
||||
try {
|
||||
await updateTaskStatus(fixture.teamRunId, task.id, "claimed", "member-a", fixture.config)
|
||||
} catch (error) {
|
||||
thrownError = error
|
||||
}
|
||||
|
||||
// then
|
||||
expect(thrownError).toBeInstanceOf(InvalidTaskTransitionError)
|
||||
expect(thrownError).toHaveProperty("message", "no reverse transitions from completed to claimed")
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
|
||||
test("updateTaskStatus rejects non-owner updates except deletion", async () => {
|
||||
// given
|
||||
const fixture = await createTasklistFixture()
|
||||
|
||||
try {
|
||||
const task = await createTask(
|
||||
fixture.teamRunId,
|
||||
createTaskInput({ status: "claimed", owner: "member-a", claimedAt: Date.now() }),
|
||||
fixture.config,
|
||||
)
|
||||
|
||||
// when
|
||||
let crossOwnerError: unknown = null
|
||||
try {
|
||||
await updateTaskStatus(fixture.teamRunId, task.id, "in_progress", "member-b", fixture.config)
|
||||
} catch (error) {
|
||||
crossOwnerError = error
|
||||
}
|
||||
|
||||
// then
|
||||
expect(crossOwnerError).toBeInstanceOf(CrossOwnerUpdateError)
|
||||
|
||||
// when
|
||||
const deletedTask = await updateTaskStatus(fixture.teamRunId, task.id, "deleted", "lead-member", fixture.config)
|
||||
|
||||
// then
|
||||
expect(deletedTask.status).toBe("deleted")
|
||||
} finally {
|
||||
await fixture.cleanup()
|
||||
}
|
||||
})
|
||||
66
src/features/team-mode/team-tasklist/update.ts
Normal file
66
src/features/team-mode/team-tasklist/update.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import path from "node:path"
|
||||
|
||||
import type { TeamModeConfig } from "../../../config/schema/team-mode"
|
||||
import { getTasksDir, resolveBaseDir } from "../team-registry"
|
||||
import { atomicWrite } from "../team-state-store/locks"
|
||||
import { TaskSchema } from "../types"
|
||||
import type { Task } from "../types"
|
||||
import { getTask } from "./get"
|
||||
|
||||
const ALLOWED_TRANSITIONS: Readonly<Record<Task["status"], ReadonlyArray<Task["status"]>>> = {
|
||||
pending: ["claimed", "deleted"],
|
||||
claimed: ["in_progress", "deleted"],
|
||||
in_progress: ["completed", "deleted"],
|
||||
completed: ["deleted"],
|
||||
deleted: [],
|
||||
}
|
||||
|
||||
function isValidTransition(currentStatus: Task["status"], nextStatus: Task["status"]): boolean {
|
||||
return ALLOWED_TRANSITIONS[currentStatus].includes(nextStatus)
|
||||
}
|
||||
|
||||
export class InvalidTaskTransitionError extends Error {
|
||||
constructor(currentStatus: Task["status"], nextStatus: Task["status"]) {
|
||||
super(`no reverse transitions from ${currentStatus} to ${nextStatus}`)
|
||||
this.name = "InvalidTaskTransitionError"
|
||||
}
|
||||
}
|
||||
|
||||
export class CrossOwnerUpdateError extends Error {
|
||||
constructor(message = "cross-owner updates are not allowed") {
|
||||
super(message)
|
||||
this.name = "CrossOwnerUpdateError"
|
||||
}
|
||||
}
|
||||
|
||||
export async function updateTaskStatus(
|
||||
teamRunId: string,
|
||||
taskId: string,
|
||||
newStatus: Task["status"],
|
||||
memberName: string,
|
||||
config: TeamModeConfig,
|
||||
): Promise<Task> {
|
||||
const task = await getTask(teamRunId, taskId, config)
|
||||
|
||||
if (!isValidTransition(task.status, newStatus)) {
|
||||
throw new InvalidTaskTransitionError(task.status, newStatus)
|
||||
}
|
||||
|
||||
if (newStatus !== "deleted" && task.owner !== memberName) {
|
||||
throw new CrossOwnerUpdateError()
|
||||
}
|
||||
|
||||
const updatedTask = TaskSchema.parse({
|
||||
...task,
|
||||
status: newStatus,
|
||||
updatedAt: Date.now(),
|
||||
})
|
||||
|
||||
const tasksDirectory = getTasksDir(resolveBaseDir(config), teamRunId)
|
||||
await atomicWrite(
|
||||
path.join(tasksDirectory, `${taskId}.json`),
|
||||
`${JSON.stringify(updatedTask, null, 2)}\n`,
|
||||
)
|
||||
|
||||
return updatedTask
|
||||
}
|
||||
Reference in New Issue
Block a user