From 06bb22860e4f3bec13d508f59b97c56338d59ed5 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 16 Jun 2026 14:05:15 -0700 Subject: [PATCH 1/4] fix(jobs): stop stale workers from regressing a terminal job status Under concurrent async_api result processing, `_update_job_progress` wrote the overall `job.status` as part of the same save() that persisted the progress blob. Since #1261 removed the `select_for_update` that serialized these writes, a slower/stale worker could overwrite a terminal status (set by a faster worker, the stale-job reaper, or a cancel) with STARTED. A wrongly non-terminal status also keeps the job claimable via `/next`, starving newer async jobs. The progress-blob save no longer writes `status`/`finished_at`, and the terminal transition is performed as a guarded, statement-scope UPDATE that only fires from a pre-terminal state. This cannot be clobbered by a stale read-modify-write and holds no transaction-length lock, so it does not reintroduce the contention #1261 removed. Jobs already in a terminal or CANCELING state are left untouched (no resurrecting a cancelled job). This is Layer 1 of #1337; the counter-accumulation race is deferred. Refs #1337, #1282, #1283 Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 33 ++++++++++++-- ami/jobs/tests/test_tasks.py | 84 +++++++++++++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 5 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index f3c996f86..692f51539 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -632,10 +632,9 @@ def _update_job_progress( progress=progress_percentage, **state_params, ) - if job.progress.is_complete(): - job.status = complete_state + became_complete = job.progress.is_complete() + if became_complete: job.progress.summary.status = complete_state - job.finished_at = datetime.datetime.now() # Use naive datetime in local time job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") # Narrow the write to the fields we actually mutated. Without this, a full # save() would overwrite `logs` and any other field on the instance @@ -644,7 +643,33 @@ def _update_job_progress( # JobLogHandler) could be clobbered by a stale read-modify-write. # `updated_at` is listed explicitly because Django skips `auto_now` bumps # when `update_fields` is provided. See PR #1261 review feedback. - job.save(update_fields=["progress", "status", "finished_at", "updated_at"]) + # + # NOTE: the overall `status` and `finished_at` columns are intentionally + # NOT written here. A blob-style `save(update_fields=[..., "status", ...])` + # writes whatever `status` this transaction read at the top of the block, + # so a slower/stale worker can regress an already-terminal status (set by a + # faster worker, the stale-job reaper, or a cancel) back to STARTED — which + # also makes the job claimable again via `/next`, starving newer work. The + # terminal transition is performed separately below as a guarded, atomic + # UPDATE. See issue #1337. + job.save(update_fields=["progress", "updated_at"]) + if became_complete: + # Conditional terminal transition: flip to the terminal state only from + # a pre-terminal status, as a single statement-scope UPDATE. This cannot + # be clobbered by a slower worker's stale read-modify-write, and — unlike + # the `select_for_update` removed in #1261 — it holds no + # transaction-length row lock, so it does not reintroduce that + # contention. A job a concurrent worker, the reaper, or a cancel already + # moved to a terminal or CANCELING state is left untouched (we do not + # resurrect a revoked/cancelled job into SUCCESS). See issue #1337. + now = datetime.datetime.now() # Use naive datetime in local time + transitioned = Job.objects.filter( + pk=job_id, + status__in=[JobState.CREATED, JobState.PENDING, JobState.STARTED, JobState.RETRY], + ).update(status=complete_state, finished_at=now) + if transitioned: + job.status = complete_state + job.finished_at = now try: _log_job_throughput(job, stage) except Exception as e: diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 89587f3f1..22883dc94 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -16,7 +16,7 @@ from ami.base.serializers import reverse_with_params from ami.jobs.models import Job, JobDispatchMode, JobState, MLJob -from ami.jobs.tasks import process_nats_pipeline_result +from ami.jobs.tasks import _update_job_progress, process_nats_pipeline_result from ami.main.models import Detection, Project, SourceImage, SourceImageCollection from ami.ml.models import Algorithm, Pipeline from ami.ml.models.algorithm import AlgorithmTaskType @@ -1229,3 +1229,85 @@ def test_jobs_health_check_runs_lost_images_before_stale_jobs(self, mock_manager job.refresh_from_db() self.assertEqual(job.status, JobState.SUCCESS.value) + + +class TestConditionalTerminalTransition(TransactionTestCase): + """Regression tests for issue #1337. + + `_update_job_progress` used to write the overall `job.status` as part of the + same `save(update_fields=[..., "status", ...])` that persisted the progress + blob. Because #1261 dropped the `select_for_update` that serialized these + writes, a slower/stale worker could regress an already-terminal status back + to STARTED (or overwrite a REVOKED/cancelled job with SUCCESS). A wrongly + non-terminal status also makes the job claimable again via `/next`, starving + newer work. The fix performs the terminal transition as a guarded atomic + UPDATE that only fires from a pre-terminal status. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Terminal Transition Project") + self.pipeline = Pipeline.objects.create(name="TT Pipeline", slug="tt-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="TT Collection", project=self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Terminal Transition Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, + ) + + def tearDown(self): + cache.clear() + + def _complete_all_stages(self): + """Drive an MLJob's stages (collect, process, results) to 100% SUCCESS.""" + for stage in ("collect", "process", "results"): + _update_job_progress(self.job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_does_not_resurrect_revoked_job(self, _mock_cleanup): + """A completing worker must not flip a REVOKED job back to SUCCESS.""" + # Bring the job to the brink of completion (collect + process done, but not + # results), so the eventual results update is the call that completes it. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + # The reaper (or a cancel) wins the race and marks the job terminal. + Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED) + + # A late results batch lands and computes is_complete()=True. + _update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + self.job.refresh_from_db() + # Status stays REVOKED — the guarded UPDATE only fires from a pre-terminal + # state, so the job is not resurrected. + self.assertEqual(self.job.status, JobState.REVOKED.value) + # The progress blob was still written, so the work is recorded as complete + # even though the status was (correctly) left terminal. + self.assertTrue(self.job.progress.is_complete()) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_sets_success_from_active_state(self, _mock_cleanup): + """A job still in an active state transitions to SUCCESS with finished_at set.""" + self.assertEqual(self.job.status, JobState.STARTED.value) + + self._complete_all_stages() + + self.job.refresh_from_db() + self.assertEqual(self.job.status, JobState.SUCCESS.value) + self.assertIsNotNone(self.job.finished_at) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_incomplete_update_never_writes_overall_status(self, _mock_cleanup): + """A non-terminal progress update must not touch the overall job.status.""" + Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED) + + # A single non-completing stage update (job has other stages still at 0%). + _update_job_progress(self.job.pk, stage="collect", progress_percentage=0.5, complete_state=JobState.SUCCESS) + + self.job.refresh_from_db() + self.assertEqual(self.job.status, JobState.REVOKED.value) From 4586fccb63e489516b8824552fc3e50ea8015fd2 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 17 Jun 2026 16:16:31 -0700 Subject: [PATCH 2/4] fix(jobs): only advance progress.summary.status when the job actually transitions When a completing batch arrived for a job that another worker, the stale-job reaper, or a cancel had already moved to a terminal state (REVOKED/FAILURE) or CANCELING, `_update_job_progress` still set `progress.summary.status` to the terminal `complete_state` (e.g. SUCCESS) and persisted it via the progress save(). The guarded UPDATE correctly left the top-level `status` column alone, but the UI reads `progress.summary.status` from the JSONB, so it showed SUCCESS while the job was actually REVOKED. This also tripped the Job.save() status-mismatch warning, reintroducing the two-sources-of-truth disagreement this PR exists to prevent. Reorder so the guarded terminal UPDATE runs before the progress blob is saved, and only advance `progress.summary.status` to `complete_state` when that UPDATE actually fires (`transitioned == 1`). On the no-transition path the JSONB summary.status keeps whatever it was. The in-memory instance and the persisted JSONB now stay consistent with the status column. The single narrow `save(update_fields=["progress", "updated_at"])` still persists the stage-level progress changes either way; no extra write is added. Adds a regression test asserting a completing batch for an already-REVOKED job leaves both `job.status` and `job.progress.summary.status` at REVOKED. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tasks.py | 51 ++++++++++++++++++++++-------------- ami/jobs/tests/test_tasks.py | 35 +++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 19 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 692f51539..77d64302d 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -633,26 +633,8 @@ def _update_job_progress( **state_params, ) became_complete = job.progress.is_complete() - if became_complete: - job.progress.summary.status = complete_state job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") - # Narrow the write to the fields we actually mutated. Without this, a full - # save() would overwrite `logs` and any other field on the instance - # fetched at the top of this block — so a concurrent worker's append to - # `progress.errors` (via `_reconcile_lost_images`) or log line (via - # JobLogHandler) could be clobbered by a stale read-modify-write. - # `updated_at` is listed explicitly because Django skips `auto_now` bumps - # when `update_fields` is provided. See PR #1261 review feedback. - # - # NOTE: the overall `status` and `finished_at` columns are intentionally - # NOT written here. A blob-style `save(update_fields=[..., "status", ...])` - # writes whatever `status` this transaction read at the top of the block, - # so a slower/stale worker can regress an already-terminal status (set by a - # faster worker, the stale-job reaper, or a cancel) back to STARTED — which - # also makes the job claimable again via `/next`, starving newer work. The - # terminal transition is performed separately below as a guarded, atomic - # UPDATE. See issue #1337. - job.save(update_fields=["progress", "updated_at"]) + if became_complete: # Conditional terminal transition: flip to the terminal state only from # a pre-terminal status, as a single statement-scope UPDATE. This cannot @@ -662,6 +644,14 @@ def _update_job_progress( # contention. A job a concurrent worker, the reaper, or a cancel already # moved to a terminal or CANCELING state is left untouched (we do not # resurrect a revoked/cancelled job into SUCCESS). See issue #1337. + # + # The UPDATE runs BEFORE the progress blob is saved so the UI-facing + # `progress.summary.status` can be advanced to `complete_state` only + # when this transition actually fires. Otherwise an already-terminal + # job (REVOKED/FAILURE/CANCELING) would keep its correct top-level + # `status` column but have `summary.status` silently flipped to SUCCESS + # inside the JSONB — reintroducing the two-sources-of-truth disagreement + # this fix exists to prevent. See issue #1337. now = datetime.datetime.now() # Use naive datetime in local time transitioned = Job.objects.filter( pk=job_id, @@ -670,6 +660,29 @@ def _update_job_progress( if transitioned: job.status = complete_state job.finished_at = now + # Only now is it correct to advance the UI-facing summary status. + # On the no-transition path (already terminal/CANCELING) the blob's + # summary.status keeps whatever it was. + job.progress.summary.status = complete_state + + # Narrow the write to the fields we actually mutated. Without this, a full + # save() would overwrite `logs` and any other field on the instance + # fetched at the top of this block — so a concurrent worker's append to + # `progress.errors` (via `_reconcile_lost_images`) or log line (via + # JobLogHandler) could be clobbered by a stale read-modify-write. + # `updated_at` is listed explicitly because Django skips `auto_now` bumps + # when `update_fields` is provided. See PR #1261 review feedback. + # + # NOTE: the overall `status` and `finished_at` columns are intentionally + # NOT written by this save(). A blob-style `save(update_fields=[..., "status", ...])` + # writes whatever `status` this transaction read at the top of the block, + # so a slower/stale worker can regress an already-terminal status (set by a + # faster worker, the stale-job reaper, or a cancel) back to STARTED — which + # also makes the job claimable again via `/next`, starving newer work. The + # terminal transition above is performed separately as a guarded, atomic + # UPDATE. This save() persists the stage-level progress changes (and, when + # the guarded UPDATE fired, the matching summary.status). See issue #1337. + job.save(update_fields=["progress", "updated_at"]) try: _log_job_throughput(job, stage) except Exception as e: diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 22883dc94..b96acda2c 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -1301,6 +1301,41 @@ def test_completion_sets_success_from_active_state(self, _mock_cleanup): self.assertEqual(self.job.status, JobState.SUCCESS.value) self.assertIsNotNone(self.job.finished_at) + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_does_not_flip_summary_status_of_revoked_job(self, _mock_cleanup): + """A completing batch for an already-REVOKED job must not flip the + UI-facing `progress.summary.status` to SUCCESS. + + Regression for the Copilot finding on PR #1338: even though the guarded + UPDATE correctly leaves the top-level `status` column at REVOKED, the + progress blob's `summary.status` was being advanced to `complete_state` + and persisted by the same save(), so the UI (which reads the JSONB) + showed SUCCESS while the job was actually REVOKED — the exact + two-sources-of-truth disagreement this PR exists to prevent. + """ + # Bring the job to the brink of completion (collect + process done). + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + # The reaper (or a cancel) wins the race and marks the job terminal. + # Use update_status() — the real revoke path — so BOTH the status column + # and the JSONB summary.status are set to REVOKED, exactly the state a + # cancelled/revoked job is in when a late batch arrives. + self.job.refresh_from_db() + self.job.update_status(JobState.REVOKED) + + # A late results batch lands and computes is_complete()=True. + _update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + self.job.refresh_from_db() + # Both sources of truth must agree on the terminal state: the status + # column stays REVOKED AND the JSONB summary.status is NOT flipped to + # SUCCESS. + self.assertEqual(self.job.status, JobState.REVOKED.value) + self.assertEqual(self.job.progress.summary.status, JobState.REVOKED) + # The stage-level progress was still recorded as complete. + self.assertTrue(self.job.progress.is_complete()) + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") def test_incomplete_update_never_writes_overall_status(self, _mock_cleanup): """A non-terminal progress update must not touch the overall job.status.""" From c080cbd46970e42205184d625c8d2a23c9ec71cf Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 18 Jun 2026 17:23:17 -0700 Subject: [PATCH 3/4] test(jobs): reproduce the lost-update status race under real concurrency TestConditionalTerminalTransition covers the guarded transition sequentially. This adds TestConcurrentStatusRace, which forces the actual interleave with threads and a real database transaction: a slow worker reads the job while it is still STARTED, another writer commits a REVOKED status, and only then does the slow worker reach its save. The assertion is that the committed REVOKED status survives. On the pre-fix code this fails (the stale full-row save resurrects the job to SUCCESS); with the conditional terminal transition it passes, because the guarded UPDATE only fires from a pre-terminal status. This locks the race in as a regression guard for issue #1337. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/tests/test_tasks.py | 123 +++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index b96acda2c..8dc99da29 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -7,6 +7,7 @@ import datetime import logging +import threading from concurrent.futures import ThreadPoolExecutor from unittest.mock import AsyncMock, MagicMock, patch @@ -1346,3 +1347,125 @@ def test_incomplete_update_never_writes_overall_status(self, _mock_cleanup): self.job.refresh_from_db() self.assertEqual(self.job.status, JobState.REVOKED.value) + + +class TestConcurrentStatusRace(TransactionTestCase): + """Reproduce the lost-update status race in ``_update_job_progress`` (issue #1337). + + The sequential ``TestConditionalTerminalTransition`` cases set the job + terminal *between* ``_update_job_progress`` transactions. This class forces + the actual concurrent interleave: a slow worker reads the job while it is + still ``STARTED``, another writer marks the job ``REVOKED`` and commits, and + only then does the slow worker reach its save. + + On ``main`` the slow worker's ``save(update_fields=[..., "status", ...])`` + writes the ``SUCCESS`` it computed from its stale snapshot, regressing the + already-committed ``REVOKED`` status (the job is resurrected). On the #1338 + fix branch the terminal transition is a guarded atomic UPDATE that only fires + from a pre-terminal status, so the committed ``REVOKED`` is preserved. + + The assertion is written so that green means the bug is fixed and red means + the race reproduced. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Concurrent Race Project") + self.pipeline = Pipeline.objects.create(name="CR Pipeline", slug="cr-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="CR Collection", project=self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Concurrent Race Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, + ) + + def tearDown(self): + cache.clear() + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_late_results_batch_does_not_resurrect_revoked_job(self, _mock_cleanup): + from ami.jobs.tasks import _update_job_progress + + # Bring the job to the brink of completion: collect + process done, but + # results still open, so the late results batch is the call that would + # complete the job and attempt the terminal transition. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + # Events to coordinate the interleave. The worker thread blocks at a seam + # that runs AFTER it has read the job (with a stale STARTED snapshot) but + # BEFORE it saves; the main thread then commits REVOKED and releases it. + worker_reached_seam = threading.Event() + main_committed_revoke = threading.Event() + + real_get_counts = _update_job_progress.__globals__["_get_current_counts_from_job_progress"] + + def blocking_get_counts(job, stage): + # This runs inside the worker's transaction, right after it has read + # the Job row, before it computes/saves the new status. Signal that + # the worker is holding its stale snapshot, then wait for the main + # thread to commit the REVOKE before letting the worker proceed. + counts = real_get_counts(job, stage) + worker_reached_seam.set() + if not main_committed_revoke.wait(timeout=10): + raise AssertionError("main thread did not commit REVOKE within timeout") + return counts + + worker_error: list[BaseException] = [] + + def worker(): + from django.db import connection + + try: + with patch( + "ami.jobs.tasks._get_current_counts_from_job_progress", + side_effect=blocking_get_counts, + ): + # This is the late results batch. It reads the job (STARTED), + # blocks at the seam, then — after the main thread revokes — + # finishes and saves, computing SUCCESS from its stale read. + _update_job_progress( + self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS + ) + except BaseException as exc: # noqa: BLE001 - surfaced to the main thread + worker_error.append(exc) + finally: + # Hygiene: this thread used its own DB connection; close it so the + # test's connection teardown does not trip over a dangling one. + connection.close() + + worker_thread = threading.Thread(target=worker, name="late-results-worker") + worker_thread.start() + + try: + # Wait until the worker is parked at the seam holding its stale read. + if not worker_reached_seam.wait(timeout=10): + raise AssertionError("worker thread never reached the seam (deadlock or early failure)") + + # Another writer (the reaper or a cancel) wins the race and commits a + # terminal status while the worker is still mid-flight. The worker + # holds no row lock (the select_for_update was removed in #1261), so + # this UPDATE commits immediately. + Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED) + main_committed_revoke.set() + finally: + worker_thread.join(timeout=15) + + if worker_thread.is_alive(): + raise AssertionError("worker thread deadlocked and did not finish") + if worker_error: + raise worker_error[0] + + self.job.refresh_from_db() + # The committed REVOKED status must survive the late worker's save. On + # main this fails: the stale read-modify-write regresses it to SUCCESS. + self.assertEqual( + self.job.status, + JobState.REVOKED.value, + f"late results batch resurrected a REVOKED job to {self.job.status!r} (lost-update race)", + ) From f347ff536a6c4e0c5c9f14e0f25cb4a48dd57010 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 18 Jun 2026 17:44:44 -0700 Subject: [PATCH 4/4] refactor(jobs): extract JobState.finalizable_states() and trim guard comments The set of states a job may transition to a terminal status from ([CREATED, PENDING, STARTED, RETRY]) was inlined in the guarded UPDATE. Extract it as JobState.finalizable_states() so the rule (and the reason CANCELING and UNKNOWN are excluded) has a single home, ready to be reused by the other terminal-status writers. Behavior is unchanged. Also condense the explanatory comments in _update_job_progress: keep the load-bearing facts (statement-scope UPDATE, no row lock, don't resurrect a cancel, status deliberately kept out of the blob save) and drop the repetition. Co-Authored-By: Claude Opus 4.8 (1M context) --- ami/jobs/models.py | 6 ++++++ ami/jobs/tasks.py | 53 +++++++++++++--------------------------------- 2 files changed, 21 insertions(+), 38 deletions(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 5e10ac1f4..784b1e8ab 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -94,6 +94,12 @@ def active_states(cls): """States where a job is actively processing and should serve tasks to workers.""" return [cls.STARTED, cls.RETRY] + @classmethod + def finalizable_states(cls): + # running_states() minus CANCELING (don't resurrect a cancel in progress) + # and UNKNOWN (never served; shouldn't auto-finalize). See #1337. + return [cls.CREATED, cls.PENDING, cls.STARTED, cls.RETRY] + def get_status_label(status: JobState, progress: float) -> str: """ diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 77d64302d..181b45301 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -636,52 +636,29 @@ def _update_job_progress( job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") if became_complete: - # Conditional terminal transition: flip to the terminal state only from - # a pre-terminal status, as a single statement-scope UPDATE. This cannot - # be clobbered by a slower worker's stale read-modify-write, and — unlike - # the `select_for_update` removed in #1261 — it holds no - # transaction-length row lock, so it does not reintroduce that - # contention. A job a concurrent worker, the reaper, or a cancel already - # moved to a terminal or CANCELING state is left untouched (we do not - # resurrect a revoked/cancelled job into SUCCESS). See issue #1337. - # - # The UPDATE runs BEFORE the progress blob is saved so the UI-facing - # `progress.summary.status` can be advanced to `complete_state` only - # when this transition actually fires. Otherwise an already-terminal - # job (REVOKED/FAILURE/CANCELING) would keep its correct top-level - # `status` column but have `summary.status` silently flipped to SUCCESS - # inside the JSONB — reintroducing the two-sources-of-truth disagreement - # this fix exists to prevent. See issue #1337. - now = datetime.datetime.now() # Use naive datetime in local time + # Flip to terminal only from a non-terminal status, as one + # statement-scope UPDATE: a stale read-modify-write can't clobber it, + # and unlike the select_for_update removed in #1261 it holds no row + # lock. A job already moved to terminal/CANCELING (by another worker, + # the reaper, or a cancel) is left alone — we don't resurrect a + # revoked job into SUCCESS. See #1337. + now = datetime.datetime.now() # naive, local time transitioned = Job.objects.filter( pk=job_id, - status__in=[JobState.CREATED, JobState.PENDING, JobState.STARTED, JobState.RETRY], + status__in=JobState.finalizable_states(), ).update(status=complete_state, finished_at=now) if transitioned: job.status = complete_state job.finished_at = now - # Only now is it correct to advance the UI-facing summary status. - # On the no-transition path (already terminal/CANCELING) the blob's - # summary.status keeps whatever it was. + # Advance summary.status only when the transition fired, else an + # already-terminal job's JSONB would disagree with its column. job.progress.summary.status = complete_state - # Narrow the write to the fields we actually mutated. Without this, a full - # save() would overwrite `logs` and any other field on the instance - # fetched at the top of this block — so a concurrent worker's append to - # `progress.errors` (via `_reconcile_lost_images`) or log line (via - # JobLogHandler) could be clobbered by a stale read-modify-write. - # `updated_at` is listed explicitly because Django skips `auto_now` bumps - # when `update_fields` is provided. See PR #1261 review feedback. - # - # NOTE: the overall `status` and `finished_at` columns are intentionally - # NOT written by this save(). A blob-style `save(update_fields=[..., "status", ...])` - # writes whatever `status` this transaction read at the top of the block, - # so a slower/stale worker can regress an already-terminal status (set by a - # faster worker, the stale-job reaper, or a cancel) back to STARTED — which - # also makes the job claimable again via `/next`, starving newer work. The - # terminal transition above is performed separately as a guarded, atomic - # UPDATE. This save() persists the stage-level progress changes (and, when - # the guarded UPDATE fired, the matching summary.status). See issue #1337. + # status/finished_at are deliberately NOT in this save() — only the + # guarded UPDATE above writes them. Folding them back in reopens #1337. + # Narrow update_fields so a concurrent append to progress.errors/logs + # isn't clobbered; updated_at explicit because auto_now skips when + # update_fields is given. job.save(update_fields=["progress", "updated_at"]) try: _log_job_throughput(job, stage)