Stop a finished job from being pulled back to running by a slower worker#1338
Conversation
Under concurrent async_api result processing, `_update_job_progress` wrote the overall `job.status` as part of the same save() that persisted the progress blob. Since #1261 removed the `select_for_update` that serialized these writes, a slower/stale worker could overwrite a terminal status (set by a faster worker, the stale-job reaper, or a cancel) with STARTED. A wrongly non-terminal status also keeps the job claimable via `/next`, starving newer async jobs. The progress-blob save no longer writes `status`/`finished_at`, and the terminal transition is performed as a guarded, statement-scope UPDATE that only fires from a pre-terminal state. This cannot be clobbered by a stale read-modify-write and holds no transaction-length lock, so it does not reintroduce the contention #1261 removed. Jobs already in a terminal or CANCELING state are left untouched (no resurrecting a cancelled job). This is Layer 1 of #1337; the counter-accumulation race is deferred. Refs #1337, #1282, #1283 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthrough
ChangesGuarded terminal transition in
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Pull request overview
This PR hardens async job finalization under concurrent worker result posts so that stale workers cannot regress a job’s terminal status (e.g., SUCCESS → STARTED) or resurrect cancelled/revoked jobs into SUCCESS. It implements the first part of the fix discussed in #1337 by making terminal status transitions conditional/atomic at the database level while still allowing progress blob updates.
Changes:
- Split job progress persistence from terminal status persistence in
_update_job_progress, and perform terminal transitions via a guarded single-statementUPDATE. - Prevent late-arriving results from overriding terminal/CANCELING job statuses.
- Add regression tests covering revoked-job completion, normal success transition, and ensuring non-terminal progress updates don’t touch
Job.status.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
ami/jobs/tasks.py |
Makes terminal status transition conditional/atomic and stops progress saves from overwriting status/finished_at. |
ami/jobs/tests/test_tasks.py |
Adds regression tests validating the guarded terminal transition behavior under concurrency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… transitions When a completing batch arrived for a job that another worker, the stale-job reaper, or a cancel had already moved to a terminal state (REVOKED/FAILURE) or CANCELING, `_update_job_progress` still set `progress.summary.status` to the terminal `complete_state` (e.g. SUCCESS) and persisted it via the progress save(). The guarded UPDATE correctly left the top-level `status` column alone, but the UI reads `progress.summary.status` from the JSONB, so it showed SUCCESS while the job was actually REVOKED. This also tripped the Job.save() status-mismatch warning, reintroducing the two-sources-of-truth disagreement this PR exists to prevent. Reorder so the guarded terminal UPDATE runs before the progress blob is saved, and only advance `progress.summary.status` to `complete_state` when that UPDATE actually fires (`transitioned == 1`). On the no-transition path the JSONB summary.status keeps whatever it was. The in-memory instance and the persisted JSONB now stay consistent with the status column. The single narrow `save(update_fields=["progress", "updated_at"])` still persists the stage-level progress changes either way; no extra write is added. Adds a regression test asserting a completing batch for an already-REVOKED job leaves both `job.status` and `job.progress.summary.status` at REVOKED. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
TestConditionalTerminalTransition covers the guarded transition sequentially. This adds TestConcurrentStatusRace, which forces the actual interleave with threads and a real database transaction: a slow worker reads the job while it is still STARTED, another writer commits a REVOKED status, and only then does the slow worker reach its save. The assertion is that the committed REVOKED status survives. On the pre-fix code this fails (the stale full-row save resurrects the job to SUCCESS); with the conditional terminal transition it passes, because the guarded UPDATE only fires from a pre-terminal status. This locks the race in as a regression guard for issue #1337. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…comments The set of states a job may transition to a terminal status from ([CREATED, PENDING, STARTED, RETRY]) was inlined in the guarded UPDATE. Extract it as JobState.finalizable_states() so the rule (and the reason CANCELING and UNKNOWN are excluded) has a single home, ready to be reused by the other terminal-status writers. Behavior is unchanged. Also condense the explanatory comments in _update_job_progress: keep the load-bearing facts (statement-scope UPDATE, no row lock, don't resurrect a cancel, status deliberately kept out of the blob save) and drop the repetition. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
@coderabitai full review |
|
Tested on arctia |
…point Issue #1337 is a lost-update race on the job status column. PR #1338 fixed one writer — `_update_job_progress` — by splitting the terminal status write out of the progress-blob save and performing it as a guarded, statement-scope UPDATE that only fires from a pre-terminal status. The other four terminal writers still did an unguarded full-row `save()` and could clobber a terminal status from the opposite direction: a cancel could overwrite a just-committed SUCCESS with REVOKED, and a stale `task_postrun` SUCCESS or `task_failure` FAILURE could resurrect a job another writer had already revoked. This change adds a single `Job._guarded_status_update(to_status, from_statuses, *, set_finished=False)` helper that performs the guarded UPDATE (no row lock, so it does not reintroduce the contention #1261 removed) and advances the in-memory instance only when the transition actually fires. The remaining terminal writers are routed through it: - `Job.cancel()`: CANCELING and REVOKED are now guarded UPDATEs. The `task.revoke()` and `cleanup_async_job_if_needed()` calls still run regardless of whether the guard fired, since a job may already be terminal but still need its NATS/Redis resources released. - `update_job_status` (task_postrun): only the terminal SUCCESS path is guarded; non-terminal celery states still flow through the dual-use `update_status()` unchanged. - `update_job_failure` (task_failure): the terminal FAILURE write is guarded, keeping the existing in-flight-async deferral guard intact. `_update_job_progress` and `_fail_job` are left as-is: the former is already guarded by #1338, and the latter is already safe via `select_for_update` plus a status precondition. After a guarded transition, callers persist `progress.summary.status` into the JSONB with a narrow `save(update_fields=["progress", ...])` rather than a full save, matching #1338 and avoiding clobbering other columns. The save only happens when the guard fired, so an already-terminal job keeps both its status column and its summary.status. One intentional behavior change: `update_job_failure` now sets `finished_at` when it marks FAILURE (it previously left it unset), making a failed terminal job consistent with `_fail_job` and the result handler. Adds sequential regression tests (postrun/failure cannot resurrect a REVOKED job; cancel of an already-SUCCESS job no-ops on status but still cleans up) and two real-concurrency tests that interleave cancel against a completing result batch in both directions. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…gnosis Observation-only follow-up to #1338/#1342. Now that terminal status transitions are irreversible, surface the two cases where a terminal verdict may have been wrong, instead of letting them disappear silently: 1. When work completes for a job the guard finds already terminal/CANCELING, log a warning. Often legitimate (cancel/reaper won the race) but, if frequent, the signal of a premature terminal verdict. 2. When a result is failed because the job's Redis state is missing, log the job age/status/dispatch first. A small age points to a not-yet-seeded or redelivered-run_job race rather than genuine cleanup. No behaviour change — both warnings sit on existing code paths. Lets us confirm the trigger before adding grace/idempotency logic (see PR body follow-up). Refs #1337, #1219, #1324. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…for #1337) (#1343) * feat(jobs): log premature-terminal and missing-state failures for diagnosis Observation-only follow-up to #1338/#1342. Now that terminal status transitions are irreversible, surface the two cases where a terminal verdict may have been wrong, instead of letting them disappear silently: 1. When work completes for a job the guard finds already terminal/CANCELING, log a warning. Often legitimate (cancel/reaper won the race) but, if frequent, the signal of a premature terminal verdict. 2. When a result is failed because the job's Redis state is missing, log the job age/status/dispatch first. A small age points to a not-yet-seeded or redelivered-run_job race rather than genuine cleanup. No behaviour change — both warnings sit on existing code paths. Lets us confirm the trigger before adding grace/idempotency logic (see PR body follow-up). Refs #1337, #1219, #1324. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix(jobs): downgrade missing-state log to info when the job is already terminal The missing-state diagnostic logged a WARNING saying 'Failing job' for every in-flight result that arrived after a job finished — but _fail_job no-ops on a terminal job, so after a cancel (which deletes the Redis state) this fired once per in-flight batch and misdescribed normal cleanup as a failure. Now: a terminal job logs at info ('ignoring in-flight result for already-terminal job'); only a NON-terminal job with missing state logs the warning, which is the case actually worth investigating. Refs #1337. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * refactor(jobs): make the diagnostic log lines operator-readable and leaner The missing-state and completed-after-terminal logs read like insider notes — ticket numbers and race-theory in the runtime message. Move the rationale and the issue reference into code comments and make the log lines plain operational statements an operator can act on without chasing a ticket. Also drop the redundant dispatch_mode field and the extra status re-query. Refs #1337. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * refactor(jobs): address review on missing-state diagnostics - Treat CANCELING as terminal-like in the missing-state classification so a cancel-in-flight result logs the benign info line instead of the misleading 'still running / marking it failed' warning (matches _fail_job's no-op set). Caught by CodeRabbit and Copilot. - Rename the values() dict from 'row' to 'job_values' (per review). - Log the completed-after-terminal case via job.logger and include the stage and attempted terminal state, without an extra status re-query (per Copilot). Refs #1337. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…point Issue #1337 is a lost-update race on the job status column. PR #1338 fixed one writer — `_update_job_progress` — by splitting the terminal status write out of the progress-blob save and performing it as a guarded, statement-scope UPDATE that only fires from a pre-terminal status. The other four terminal writers still did an unguarded full-row `save()` and could clobber a terminal status from the opposite direction: a cancel could overwrite a just-committed SUCCESS with REVOKED, and a stale `task_postrun` SUCCESS or `task_failure` FAILURE could resurrect a job another writer had already revoked. This change adds a single `Job._guarded_status_update(to_status, from_statuses, *, set_finished=False)` helper that performs the guarded UPDATE (no row lock, so it does not reintroduce the contention #1261 removed) and advances the in-memory instance only when the transition actually fires. The remaining terminal writers are routed through it: - `Job.cancel()`: CANCELING and REVOKED are now guarded UPDATEs. The `task.revoke()` and `cleanup_async_job_if_needed()` calls still run regardless of whether the guard fired, since a job may already be terminal but still need its NATS/Redis resources released. - `update_job_status` (task_postrun): only the terminal SUCCESS path is guarded; non-terminal celery states still flow through the dual-use `update_status()` unchanged. - `update_job_failure` (task_failure): the terminal FAILURE write is guarded, keeping the existing in-flight-async deferral guard intact. `_update_job_progress` and `_fail_job` are left as-is: the former is already guarded by #1338, and the latter is already safe via `select_for_update` plus a status precondition. After a guarded transition, callers persist `progress.summary.status` into the JSONB with a narrow `save(update_fields=["progress", ...])` rather than a full save, matching #1338 and avoiding clobbering other columns. The save only happens when the guard fired, so an already-terminal job keeps both its status column and its summary.status. One intentional behavior change: `update_job_failure` now sets `finished_at` when it marks FAILURE (it previously left it unset), making a failed terminal job consistent with `_fail_job` and the result handler. Adds sequential regression tests (postrun/failure cannot resurrect a REVOKED job; cancel of an already-SUCCESS job no-ops on status but still cleans up) and two real-concurrency tests that interleave cancel against a completing result batch in both directions. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
When many ML workers process an async job at once, a job that has actually finished could end up still showing as "running", and a job that was cancelled or revoked could flip back to "succeeded". This happened because every result a worker posted re-saved the job's overall status together with its progress, so a slower worker working from an out-of-date snapshot could overwrite a newer status. Beyond the wrong label, this has a real cost: a job left wrongly in a running state stays claimable by workers through the task endpoint, so workers keep picking it up and newer jobs wait behind it.
This change makes the job's final status transition safe under that concurrency. A worker can no longer regress a job that another worker, the stale-job checker, or a user cancellation has already finished. It is the first, smaller half of the fix described in #1337 (the lost-update race on the job's progress/status); the counter-accumulation half is deferred to a separate design discussion.
List of Changes
_update_job_progress(ami/jobs/tasks.py),status/finished_atare dropped from the progress-blobsave(update_fields=...); the terminal transition is a guardedJob.objects.filter(pk=..., status__in=JobState.finalizable_states()).update(status=..., finished_at=...). This is statement-scope and holds no transaction-length lock, so it does not reintroduce the row-lock contention removed in fix(jobs): fixes for concurrent ML processing jobs #1261.finalizable_states(), which excludes terminal, CANCELING, and UNKNOWN states, so completion can't override them. This is a small, intentional behaviour change versus the previous code, which could flip CANCELING → SUCCESS. It aligns with CANCELLED jobs leak through /next filter, starve newer async_api jobs #1282/Clean up task queue when job is revoked or re-started, fix duplicate tasks #1283.JobState.finalizable_states()([CREATED, PENDING, STARTED, RETRY]—running_states()minus CANCELING and UNKNOWN), with a short note on why those two are excluded, so the guard rule has a single source of truth for the remaining terminal-status writers to reuse.TestConditionalTerminalTransitioncovers the guard logic directly: a completing worker does not resurrect a REVOKED job; a normal job still transitions STARTED → SUCCESS withfinished_atset; a non-terminal progress update never writes the overall status; and a completion does not flip the JSONBsummary.statusof a REVOKED job.TestConcurrentStatusRacereproduces the actual lost-update interleave with real threads and database transactions — a slow worker reads the job while STARTED, another writer commits REVOKED, and only then does the slow worker save. This test fails on the pre-fix code (the stale save resurrects the job to SUCCESS) and passes with the guarded transition, so it is a genuine regression guard rather than a unit test of the guard in isolation.Detailed Description
Since #1261 dropped the
select_for_updatethat serialized progress writes (it was a contention bottleneck — see #1256), concurrent result handlers do an unlocked read-modify-write of theJobrow. Two handlers reading the same snapshot and both saving the whole blob means one update is lost. The most damaging case is the status field: a staleSTARTEDoverwriting a freshly-written terminal status. That both misreports the job and — because status drives claimability at the worker task endpoint — keeps a finished job in rotation, starving newer async jobs (the same shape as #1282).This PR is Layer 1 of #1337: it makes the status transition atomic and non-regressing without re-adding a transaction-length lock. The counter accumulation (processed/failed/detection/classification counts in the progress blob) can still drift by a batch under race — that is Layer 2, deferred because the right shape (counter columns vs. a per-task table vs. Redis-as-source vs. an event log) depends on other consumers of per-image job history and needs a dedicated design pass. See #1337 for the plan.
Scope note: this hardens the result-handler status writer. The other terminal-status writers (the cancel path and the Celery task signal handlers) still do unguarded full-row saves; routing them through the same
finalizable_states()guard is a focused fast-follow on top of this PR. #1324 should also be gated behind this change: itsacks_lateredelivery guard readsjob.status, which is the field this PR makes reliable.How to Test the Changes
pytest ami/jobs/tests/test_tasks.py::TestConditionalTerminalTransition— the four guard cases.pytest ami/jobs/tests/test_tasks.py::TestConcurrentStatusRace— the real-concurrency race test. Check out the pre-fix commit to see it fail (the job is resurrected to SUCCESS), then this branch to see it pass.ami/jobs/tests/test_tasks.pyandami/jobs/tests/test_jobs.pypass locally.Checklist
Refs #1337, #1282, #1283
Summary by CodeRabbit
Bug Fixes
Tests