diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 5e10ac1f4..784b1e8ab 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -94,6 +94,12 @@ def active_states(cls): """States where a job is actively processing and should serve tasks to workers.""" return [cls.STARTED, cls.RETRY] + @classmethod + def finalizable_states(cls): + # running_states() minus CANCELING (don't resurrect a cancel in progress) + # and UNKNOWN (never served; shouldn't auto-finalize). See #1337. + return [cls.CREATED, cls.PENDING, cls.STARTED, cls.RETRY] + def get_status_label(status: JobState, progress: float) -> str: """ diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index f3c996f86..181b45301 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -632,19 +632,34 @@ def _update_job_progress( progress=progress_percentage, **state_params, ) - if job.progress.is_complete(): - job.status = complete_state - job.progress.summary.status = complete_state - job.finished_at = datetime.datetime.now() # Use naive datetime in local time + became_complete = job.progress.is_complete() job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%") - # Narrow the write to the fields we actually mutated. Without this, a full - # save() would overwrite `logs` and any other field on the instance - # fetched at the top of this block — so a concurrent worker's append to - # `progress.errors` (via `_reconcile_lost_images`) or log line (via - # JobLogHandler) could be clobbered by a stale read-modify-write. - # `updated_at` is listed explicitly because Django skips `auto_now` bumps - # when `update_fields` is provided. See PR #1261 review feedback. - job.save(update_fields=["progress", "status", "finished_at", "updated_at"]) + + if became_complete: + # Flip to terminal only from a non-terminal status, as one + # statement-scope UPDATE: a stale read-modify-write can't clobber it, + # and unlike the select_for_update removed in #1261 it holds no row + # lock. A job already moved to terminal/CANCELING (by another worker, + # the reaper, or a cancel) is left alone — we don't resurrect a + # revoked job into SUCCESS. See #1337. + now = datetime.datetime.now() # naive, local time + transitioned = Job.objects.filter( + pk=job_id, + status__in=JobState.finalizable_states(), + ).update(status=complete_state, finished_at=now) + if transitioned: + job.status = complete_state + job.finished_at = now + # Advance summary.status only when the transition fired, else an + # already-terminal job's JSONB would disagree with its column. + job.progress.summary.status = complete_state + + # status/finished_at are deliberately NOT in this save() — only the + # guarded UPDATE above writes them. Folding them back in reopens #1337. + # Narrow update_fields so a concurrent append to progress.errors/logs + # isn't clobbered; updated_at explicit because auto_now skips when + # update_fields is given. + job.save(update_fields=["progress", "updated_at"]) try: _log_job_throughput(job, stage) except Exception as e: diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 89587f3f1..8dc99da29 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -7,6 +7,7 @@ import datetime import logging +import threading from concurrent.futures import ThreadPoolExecutor from unittest.mock import AsyncMock, MagicMock, patch @@ -16,7 +17,7 @@ from ami.base.serializers import reverse_with_params from ami.jobs.models import Job, JobDispatchMode, JobState, MLJob -from ami.jobs.tasks import process_nats_pipeline_result +from ami.jobs.tasks import _update_job_progress, process_nats_pipeline_result from ami.main.models import Detection, Project, SourceImage, SourceImageCollection from ami.ml.models import Algorithm, Pipeline from ami.ml.models.algorithm import AlgorithmTaskType @@ -1229,3 +1230,242 @@ def test_jobs_health_check_runs_lost_images_before_stale_jobs(self, mock_manager job.refresh_from_db() self.assertEqual(job.status, JobState.SUCCESS.value) + + +class TestConditionalTerminalTransition(TransactionTestCase): + """Regression tests for issue #1337. + + `_update_job_progress` used to write the overall `job.status` as part of the + same `save(update_fields=[..., "status", ...])` that persisted the progress + blob. Because #1261 dropped the `select_for_update` that serialized these + writes, a slower/stale worker could regress an already-terminal status back + to STARTED (or overwrite a REVOKED/cancelled job with SUCCESS). A wrongly + non-terminal status also makes the job claimable again via `/next`, starving + newer work. The fix performs the terminal transition as a guarded atomic + UPDATE that only fires from a pre-terminal status. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Terminal Transition Project") + self.pipeline = Pipeline.objects.create(name="TT Pipeline", slug="tt-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="TT Collection", project=self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Terminal Transition Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, + ) + + def tearDown(self): + cache.clear() + + def _complete_all_stages(self): + """Drive an MLJob's stages (collect, process, results) to 100% SUCCESS.""" + for stage in ("collect", "process", "results"): + _update_job_progress(self.job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_does_not_resurrect_revoked_job(self, _mock_cleanup): + """A completing worker must not flip a REVOKED job back to SUCCESS.""" + # Bring the job to the brink of completion (collect + process done, but not + # results), so the eventual results update is the call that completes it. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + # The reaper (or a cancel) wins the race and marks the job terminal. + Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED) + + # A late results batch lands and computes is_complete()=True. + _update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + self.job.refresh_from_db() + # Status stays REVOKED — the guarded UPDATE only fires from a pre-terminal + # state, so the job is not resurrected. + self.assertEqual(self.job.status, JobState.REVOKED.value) + # The progress blob was still written, so the work is recorded as complete + # even though the status was (correctly) left terminal. + self.assertTrue(self.job.progress.is_complete()) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_sets_success_from_active_state(self, _mock_cleanup): + """A job still in an active state transitions to SUCCESS with finished_at set.""" + self.assertEqual(self.job.status, JobState.STARTED.value) + + self._complete_all_stages() + + self.job.refresh_from_db() + self.assertEqual(self.job.status, JobState.SUCCESS.value) + self.assertIsNotNone(self.job.finished_at) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_completion_does_not_flip_summary_status_of_revoked_job(self, _mock_cleanup): + """A completing batch for an already-REVOKED job must not flip the + UI-facing `progress.summary.status` to SUCCESS. + + Regression for the Copilot finding on PR #1338: even though the guarded + UPDATE correctly leaves the top-level `status` column at REVOKED, the + progress blob's `summary.status` was being advanced to `complete_state` + and persisted by the same save(), so the UI (which reads the JSONB) + showed SUCCESS while the job was actually REVOKED — the exact + two-sources-of-truth disagreement this PR exists to prevent. + """ + # Bring the job to the brink of completion (collect + process done). + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + # The reaper (or a cancel) wins the race and marks the job terminal. + # Use update_status() — the real revoke path — so BOTH the status column + # and the JSONB summary.status are set to REVOKED, exactly the state a + # cancelled/revoked job is in when a late batch arrives. + self.job.refresh_from_db() + self.job.update_status(JobState.REVOKED) + + # A late results batch lands and computes is_complete()=True. + _update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + self.job.refresh_from_db() + # Both sources of truth must agree on the terminal state: the status + # column stays REVOKED AND the JSONB summary.status is NOT flipped to + # SUCCESS. + self.assertEqual(self.job.status, JobState.REVOKED.value) + self.assertEqual(self.job.progress.summary.status, JobState.REVOKED) + # The stage-level progress was still recorded as complete. + self.assertTrue(self.job.progress.is_complete()) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_incomplete_update_never_writes_overall_status(self, _mock_cleanup): + """A non-terminal progress update must not touch the overall job.status.""" + Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED) + + # A single non-completing stage update (job has other stages still at 0%). + _update_job_progress(self.job.pk, stage="collect", progress_percentage=0.5, complete_state=JobState.SUCCESS) + + self.job.refresh_from_db() + self.assertEqual(self.job.status, JobState.REVOKED.value) + + +class TestConcurrentStatusRace(TransactionTestCase): + """Reproduce the lost-update status race in ``_update_job_progress`` (issue #1337). + + The sequential ``TestConditionalTerminalTransition`` cases set the job + terminal *between* ``_update_job_progress`` transactions. This class forces + the actual concurrent interleave: a slow worker reads the job while it is + still ``STARTED``, another writer marks the job ``REVOKED`` and commits, and + only then does the slow worker reach its save. + + On ``main`` the slow worker's ``save(update_fields=[..., "status", ...])`` + writes the ``SUCCESS`` it computed from its stale snapshot, regressing the + already-committed ``REVOKED`` status (the job is resurrected). On the #1338 + fix branch the terminal transition is a guarded atomic UPDATE that only fires + from a pre-terminal status, so the committed ``REVOKED`` is preserved. + + The assertion is written so that green means the bug is fixed and red means + the race reproduced. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Concurrent Race Project") + self.pipeline = Pipeline.objects.create(name="CR Pipeline", slug="cr-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="CR Collection", project=self.project) + self.job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name="Concurrent Race Job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=JobDispatchMode.ASYNC_API, + status=JobState.STARTED, + ) + + def tearDown(self): + cache.clear() + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_late_results_batch_does_not_resurrect_revoked_job(self, _mock_cleanup): + from ami.jobs.tasks import _update_job_progress + + # Bring the job to the brink of completion: collect + process done, but + # results still open, so the late results batch is the call that would + # complete the job and attempt the terminal transition. + _update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS) + _update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS) + + # Events to coordinate the interleave. The worker thread blocks at a seam + # that runs AFTER it has read the job (with a stale STARTED snapshot) but + # BEFORE it saves; the main thread then commits REVOKED and releases it. + worker_reached_seam = threading.Event() + main_committed_revoke = threading.Event() + + real_get_counts = _update_job_progress.__globals__["_get_current_counts_from_job_progress"] + + def blocking_get_counts(job, stage): + # This runs inside the worker's transaction, right after it has read + # the Job row, before it computes/saves the new status. Signal that + # the worker is holding its stale snapshot, then wait for the main + # thread to commit the REVOKE before letting the worker proceed. + counts = real_get_counts(job, stage) + worker_reached_seam.set() + if not main_committed_revoke.wait(timeout=10): + raise AssertionError("main thread did not commit REVOKE within timeout") + return counts + + worker_error: list[BaseException] = [] + + def worker(): + from django.db import connection + + try: + with patch( + "ami.jobs.tasks._get_current_counts_from_job_progress", + side_effect=blocking_get_counts, + ): + # This is the late results batch. It reads the job (STARTED), + # blocks at the seam, then — after the main thread revokes — + # finishes and saves, computing SUCCESS from its stale read. + _update_job_progress( + self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS + ) + except BaseException as exc: # noqa: BLE001 - surfaced to the main thread + worker_error.append(exc) + finally: + # Hygiene: this thread used its own DB connection; close it so the + # test's connection teardown does not trip over a dangling one. + connection.close() + + worker_thread = threading.Thread(target=worker, name="late-results-worker") + worker_thread.start() + + try: + # Wait until the worker is parked at the seam holding its stale read. + if not worker_reached_seam.wait(timeout=10): + raise AssertionError("worker thread never reached the seam (deadlock or early failure)") + + # Another writer (the reaper or a cancel) wins the race and commits a + # terminal status while the worker is still mid-flight. The worker + # holds no row lock (the select_for_update was removed in #1261), so + # this UPDATE commits immediately. + Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED) + main_committed_revoke.set() + finally: + worker_thread.join(timeout=15) + + if worker_thread.is_alive(): + raise AssertionError("worker thread deadlocked and did not finish") + if worker_error: + raise worker_error[0] + + self.job.refresh_from_db() + # The committed REVOKED status must survive the late worker's save. On + # main this fails: the stale read-modify-write regresses it to SUCCESS. + self.assertEqual( + self.job.status, + JobState.REVOKED.value, + f"late results batch resurrected a REVOKED job to {self.job.status!r} (lost-update race)", + )