Skip to content

Commit cd89e40

Browse files
authored
feat(task): legacy API aliases, task_center compat, and WebUI updates (#135)
* feat(task): legacy API aliases, task_center compat, and WebUI updates - Add enum parsing helpers with legacy status/priority aliases in task routes - Extend task store/manager and task_center for backward compatibility - Update legacy table migration script - Refresh Task queue/sheet UI, API types, and locales; add unit tests * fix(task): atomic queue claim and legacy paused migration - Claim queue ref and flip execution in one SQLite transaction; executor respects prior RUNNING/started_at from claim. - Migration preserves paused executions/queue refs; scheduler active when legacy not cancelled; delete paused queue refs on pause. - Stabilize orphan-queue recovery test; relax paused-scheduler migration assertion. - WebUI: resolve assistant role/finish from message.info for reply preview. * fix(task): address PR #135 review findings - task_center: infer scheduled task when run_once=True so missing run_at/cron raises a proper validation error instead of silently executing immediately. - task_center: coerce string booleans ("false"/"0"/"no"/...) in the schedule JSON compat layer via _coerce_legacy_bool, preventing recurring tasks from being misclassified as one-time. - webui QueuedSection: refresh the open detail drawer by execution id so the detail keeps updating after pagination/filter changes, independent of the current page's tasks list. - tests: add compat tests for the two task_center regressions above.
1 parent 3a58d90 commit cd89e40

20 files changed

Lines changed: 2130 additions & 259 deletions

File tree

flocks/cli/commands/task.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,19 @@ async def _list_tasks(status_val, type_val, limit, fmt):
9696
scheduler_status = None
9797
if status_val == "running":
9898
scheduler_status = SchedulerStatus.ACTIVE
99-
elif status_val == "paused":
99+
elif status_val in ("paused", "disabled"):
100100
scheduler_status = SchedulerStatus.DISABLED
101101
tasks, total = await TaskManager.list_schedulers(status=scheduler_status, limit=limit)
102102
else:
103+
task_status = None
104+
if status_val:
105+
mapped_status = "cancelled" if status_val == "paused" else status_val
106+
try:
107+
task_status = TaskStatus(mapped_status)
108+
except ValueError as exc:
109+
raise typer.BadParameter(f"Invalid execution status: {status_val}") from exc
103110
tasks, total = await TaskManager.list_executions(
104-
status=TaskStatus(status_val) if status_val else None,
111+
status=task_status,
105112
limit=limit,
106113
)
107114

@@ -121,7 +128,7 @@ async def _list_tasks(status_val, type_val, limit, fmt):
121128
status_icon = {
122129
"pending": "⏳", "queued": "📋", "running": "🟢",
123130
"completed": "✅", "failed": "❌", "cancelled": "🚫",
124-
"paused": "⏸️", "stopped": "🛑",
131+
"disabled": "⏸️", "stopped": "🛑",
125132
}
126133
for t in tasks:
127134
icon = status_icon.get(t.status.value, "·")

flocks/server/routes/task_entities.py

Lines changed: 141 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Execution-centric task scheduler/execution routes."""
22

3-
from typing import List, Optional
3+
from enum import Enum
4+
from typing import List, Optional, Type
45

56
from fastapi import APIRouter, HTTPException, Query, status
67
from pydantic import BaseModel, ConfigDict, Field
@@ -66,6 +67,77 @@ class PaginatedResponse(BaseModel):
6667
limit: int
6768

6869

70+
def _parse_enum(
71+
value: Optional[str],
72+
enum_cls: Type[Enum],
73+
*,
74+
label: str,
75+
legacy_aliases: Optional[dict[str, object]] = None,
76+
):
77+
if not value:
78+
return None
79+
mapped = legacy_aliases.get(value, value) if legacy_aliases else value
80+
try:
81+
return enum_cls(mapped)
82+
except ValueError as exc:
83+
raise HTTPException(
84+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
85+
detail=f"Invalid {label}: {value}",
86+
) from exc
87+
88+
89+
def _parse_scheduler_status_filter(status_filter: Optional[str]):
90+
from flocks.task.models import SchedulerStatus
91+
92+
return _parse_enum(
93+
status_filter,
94+
SchedulerStatus,
95+
label="scheduler status",
96+
legacy_aliases={
97+
"running": SchedulerStatus.ACTIVE,
98+
"paused": SchedulerStatus.DISABLED,
99+
},
100+
)
101+
102+
103+
def _parse_execution_status_filter(status_filter: Optional[str]):
104+
from flocks.task.models import TaskStatus
105+
106+
return _parse_enum(
107+
status_filter,
108+
TaskStatus,
109+
label="execution status",
110+
legacy_aliases={"paused": TaskStatus.CANCELLED},
111+
)
112+
113+
114+
def _parse_priority(priority: Optional[str]):
115+
from flocks.task.models import TaskPriority
116+
117+
return _parse_enum(priority, TaskPriority, label="task priority")
118+
119+
120+
def _parse_delivery_status(delivery_status: Optional[str]):
121+
from flocks.task.models import DeliveryStatus
122+
123+
return _parse_enum(delivery_status, DeliveryStatus, label="delivery status")
124+
125+
126+
def _parse_execution_mode(execution_mode: Optional[str]):
127+
from flocks.task.models import ExecutionMode
128+
129+
return _parse_enum(execution_mode, ExecutionMode, label="execution mode")
130+
131+
132+
def _parse_task_type(task_type: str) -> str:
133+
if task_type not in {"queued", "scheduled"}:
134+
raise HTTPException(
135+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
136+
detail=f"Invalid task type: {task_type}",
137+
)
138+
return task_type
139+
140+
69141
@router.get("/task-system/notice")
70142
async def get_task_system_notice():
71143
from flocks.task.manager import TaskManager
@@ -114,11 +186,10 @@ async def list_schedulers(
114186
limit: int = Query(20, ge=1, le=100),
115187
):
116188
from flocks.task.manager import TaskManager
117-
from flocks.task.models import SchedulerStatus, TaskPriority
118189

119190
items, total = await TaskManager.list_schedulers(
120-
status=SchedulerStatus(status_filter) if status_filter else None,
121-
priority=TaskPriority(priority) if priority else None,
191+
status=_parse_scheduler_status_filter(status_filter),
192+
priority=_parse_priority(priority),
122193
scheduled_only=scheduled_only,
123194
sort_by=sort_by,
124195
sort_order=sort_order,
@@ -137,43 +208,50 @@ async def list_schedulers(
137208
async def create_scheduler(req: SchedulerCreateRequest):
138209
from flocks.task.manager import TaskManager
139210
from flocks.task.models import (
140-
ExecutionMode,
141211
SchedulerMode,
142-
TaskPriority,
143212
TaskSource,
144213
TaskTrigger,
145214
build_schedule,
146215
)
147216

148-
if req.type == "queued":
149-
trigger = TaskTrigger(runImmediately=True)
150-
mode = SchedulerMode.ONCE
151-
elif req.run_once:
152-
trigger = build_schedule(
153-
run_once=True,
154-
run_at=req.run_at,
155-
cron=req.cron,
156-
cron_description=req.cron_description,
157-
timezone=req.timezone,
158-
)
159-
mode = SchedulerMode.ONCE
160-
else:
161-
trigger = build_schedule(
162-
run_once=False,
163-
cron=req.cron,
164-
cron_description=req.cron_description,
165-
timezone=req.timezone,
166-
)
167-
mode = SchedulerMode.CRON
217+
task_type = _parse_task_type(req.type)
218+
priority = _parse_priority(req.priority)
219+
execution_mode = _parse_execution_mode(req.execution_mode)
220+
try:
221+
if task_type == "queued":
222+
trigger = TaskTrigger(runImmediately=True)
223+
mode = SchedulerMode.ONCE
224+
elif req.run_once:
225+
trigger = build_schedule(
226+
run_once=True,
227+
run_at=req.run_at,
228+
cron=req.cron,
229+
cron_description=req.cron_description,
230+
timezone=req.timezone,
231+
)
232+
mode = SchedulerMode.ONCE
233+
else:
234+
trigger = build_schedule(
235+
run_once=False,
236+
cron=req.cron,
237+
cron_description=req.cron_description,
238+
timezone=req.timezone,
239+
)
240+
mode = SchedulerMode.CRON
241+
except ValueError as exc:
242+
raise HTTPException(
243+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
244+
detail=str(exc),
245+
) from exc
168246

169247
scheduler = await TaskManager.create_scheduler(
170248
title=req.title,
171249
description=req.description,
172250
mode=mode,
173-
priority=TaskPriority(req.priority),
251+
priority=priority,
174252
source=TaskSource(user_prompt=req.user_prompt) if req.user_prompt else None,
175253
trigger=trigger,
176-
execution_mode=ExecutionMode(req.execution_mode),
254+
execution_mode=execution_mode,
177255
agent_name=req.agent_name,
178256
workflow_id=req.workflow_id,
179257
skills=req.skills,
@@ -198,29 +276,34 @@ async def get_scheduler(scheduler_id: str):
198276
@router.put("/task-schedulers/{scheduler_id}")
199277
async def update_scheduler(scheduler_id: str, req: SchedulerUpdateRequest):
200278
from flocks.task.manager import TaskManager
201-
from flocks.task.models import ExecutionMode, TaskPriority
202279

203280
fields = {k: v for k, v in req.model_dump(exclude_none=True).items()}
204281
if "priority" in fields:
205-
fields["priority"] = TaskPriority(fields["priority"])
282+
fields["priority"] = _parse_priority(fields["priority"])
206283
if "execution_mode" in fields:
207-
fields["execution_mode"] = ExecutionMode(fields["execution_mode"])
284+
fields["execution_mode"] = _parse_execution_mode(fields["execution_mode"])
208285
cron = fields.pop("cron", None)
209286
tz = fields.pop("timezone", None)
210287
cron_desc = fields.pop("cron_description", None)
211288
run_once = fields.pop("run_once", None)
212289
run_at = fields.pop("run_at", None)
213290
user_prompt = fields.pop("user_prompt", None)
214-
scheduler = await TaskManager.update_scheduler_with_trigger(
215-
scheduler_id,
216-
fields=fields,
217-
cron=cron,
218-
timezone=tz,
219-
cron_description=cron_desc,
220-
run_once=run_once,
221-
run_at=run_at,
222-
user_prompt=user_prompt,
223-
)
291+
try:
292+
scheduler = await TaskManager.update_scheduler_with_trigger(
293+
scheduler_id,
294+
fields=fields,
295+
cron=cron,
296+
timezone=tz,
297+
cron_description=cron_desc,
298+
run_once=run_once,
299+
run_at=run_at,
300+
user_prompt=user_prompt,
301+
)
302+
except ValueError as exc:
303+
raise HTTPException(
304+
status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
305+
detail=str(exc),
306+
) from exc
224307
if not scheduler:
225308
raise HTTPException(404, "Task scheduler not found")
226309
return scheduler.model_dump(mode="json", by_alias=True)
@@ -296,13 +379,12 @@ async def list_executions(
296379
limit: int = Query(20, ge=1, le=100),
297380
):
298381
from flocks.task.manager import TaskManager
299-
from flocks.task.models import DeliveryStatus, TaskPriority, TaskStatus
300382

301383
items, total = await TaskManager.list_executions(
302384
scheduler_id=scheduler_id,
303-
status=TaskStatus(status_filter) if status_filter else None,
304-
priority=TaskPriority(priority) if priority else None,
305-
delivery_status=DeliveryStatus(delivery_status) if delivery_status else None,
385+
status=_parse_execution_status_filter(status_filter),
386+
priority=_parse_priority(priority),
387+
delivery_status=_parse_delivery_status(delivery_status),
306388
sort_by=sort_by,
307389
sort_order=sort_order,
308390
offset=offset,
@@ -316,6 +398,20 @@ async def list_executions(
316398
)
317399

318400

401+
@router.post("/task-executions/batch/cancel")
402+
async def batch_cancel(req: BatchRequest):
403+
from flocks.task.manager import TaskManager
404+
405+
return {"cancelled": await TaskManager.batch_cancel(req.execution_ids)}
406+
407+
408+
@router.post("/task-executions/batch/delete")
409+
async def batch_delete(req: BatchRequest):
410+
from flocks.task.manager import TaskManager
411+
412+
return {"deleted": await TaskManager.batch_delete(req.execution_ids)}
413+
414+
319415
@router.get("/task-executions/{execution_id}")
320416
async def get_execution(execution_id: str):
321417
from flocks.task.manager import TaskManager
@@ -345,27 +441,6 @@ async def cancel_execution(execution_id: str):
345441
raise HTTPException(404, "Task execution not found")
346442
return execution.model_dump(mode="json", by_alias=True)
347443

348-
349-
@router.post("/task-executions/{execution_id}/pause")
350-
async def pause_execution(execution_id: str):
351-
from flocks.task.manager import TaskManager
352-
353-
execution = await TaskManager.pause_execution(execution_id)
354-
if not execution:
355-
raise HTTPException(404, "Task execution not found")
356-
return execution.model_dump(mode="json", by_alias=True)
357-
358-
359-
@router.post("/task-executions/{execution_id}/resume")
360-
async def resume_execution(execution_id: str):
361-
from flocks.task.manager import TaskManager
362-
363-
execution = await TaskManager.resume_execution(execution_id)
364-
if not execution:
365-
raise HTTPException(404, "Task execution not found")
366-
return execution.model_dump(mode="json", by_alias=True)
367-
368-
369444
@router.post("/task-executions/{execution_id}/retry")
370445
async def retry_execution(execution_id: str):
371446
from flocks.task.manager import TaskManager
@@ -395,17 +470,3 @@ async def delete_execution(execution_id: str):
395470
return {"ok": True}
396471

397472

398-
@router.post("/task-executions/batch/cancel")
399-
async def batch_cancel(req: BatchRequest):
400-
from flocks.task.manager import TaskManager
401-
402-
return {"cancelled": await TaskManager.batch_cancel(req.execution_ids)}
403-
404-
405-
@router.post("/task-executions/batch/delete")
406-
async def batch_delete(req: BatchRequest):
407-
from flocks.task.manager import TaskManager
408-
409-
return {"deleted": await TaskManager.batch_delete(req.execution_ids)}
410-
411-

flocks/task/executor.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,19 @@ async def dispatch(
4444
execution: TaskExecution,
4545
scheduler: TaskScheduler,
4646
) -> TaskExecution:
47+
# The execution row was already flipped to RUNNING atomically with the
48+
# queue ref inside TaskStore.claim_next_queue_execution. We only need
49+
# to make sure the in-memory object reflects that and to persist the
50+
# session_id once the task session has been created.
51+
if execution.status != TaskStatus.RUNNING:
52+
execution.status = TaskStatus.RUNNING
53+
if execution.started_at is None:
54+
execution.started_at = datetime.now(timezone.utc)
55+
4756
session_id: Optional[str] = None
4857
if execution.execution_mode == ExecutionMode.AGENT:
4958
session_id = await cls._create_task_session(execution, scheduler)
5059

51-
started_at = datetime.now(timezone.utc)
52-
execution.status = TaskStatus.RUNNING
53-
execution.started_at = started_at
5460
execution.session_id = session_id
5561
await TaskStore.update_execution(execution)
5662

0 commit comments

Comments
 (0)