Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def active_states(cls):
"""States where a job is actively processing and should serve tasks to workers."""
return [cls.STARTED, cls.RETRY]

@classmethod
def finalizable_states(cls):
# running_states() minus CANCELING (don't resurrect a cancel in progress)
# and UNKNOWN (never served; shouldn't auto-finalize). See #1337.
return [cls.CREATED, cls.PENDING, cls.STARTED, cls.RETRY]


def get_status_label(status: JobState, progress: float) -> str:
"""
Expand Down
39 changes: 27 additions & 12 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,19 +632,34 @@ def _update_job_progress(
progress=progress_percentage,
**state_params,
)
if job.progress.is_complete():
job.status = complete_state
job.progress.summary.status = complete_state
job.finished_at = datetime.datetime.now() # Use naive datetime in local time
became_complete = job.progress.is_complete()
job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
Comment thread
mihow marked this conversation as resolved.
# Narrow the write to the fields we actually mutated. Without this, a full
# save() would overwrite `logs` and any other field on the instance
# fetched at the top of this block — so a concurrent worker's append to
# `progress.errors` (via `_reconcile_lost_images`) or log line (via
# JobLogHandler) could be clobbered by a stale read-modify-write.
# `updated_at` is listed explicitly because Django skips `auto_now` bumps
# when `update_fields` is provided. See PR #1261 review feedback.
job.save(update_fields=["progress", "status", "finished_at", "updated_at"])

if became_complete:
# Flip to terminal only from a non-terminal status, as one
# statement-scope UPDATE: a stale read-modify-write can't clobber it,
# and unlike the select_for_update removed in #1261 it holds no row
# lock. A job already moved to terminal/CANCELING (by another worker,
# the reaper, or a cancel) is left alone — we don't resurrect a
# revoked job into SUCCESS. See #1337.
now = datetime.datetime.now() # naive, local time
transitioned = Job.objects.filter(
pk=job_id,
status__in=JobState.finalizable_states(),
).update(status=complete_state, finished_at=now)
if transitioned:
job.status = complete_state
job.finished_at = now
# Advance summary.status only when the transition fired, else an
# already-terminal job's JSONB would disagree with its column.
job.progress.summary.status = complete_state

# status/finished_at are deliberately NOT in this save() — only the
# guarded UPDATE above writes them. Folding them back in reopens #1337.
# Narrow update_fields so a concurrent append to progress.errors/logs
# isn't clobbered; updated_at explicit because auto_now skips when
# update_fields is given.
job.save(update_fields=["progress", "updated_at"])
try:
_log_job_throughput(job, stage)
except Exception as e:
Expand Down
242 changes: 241 additions & 1 deletion ami/jobs/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import datetime
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import AsyncMock, MagicMock, patch

Expand All @@ -16,7 +17,7 @@

from ami.base.serializers import reverse_with_params
from ami.jobs.models import Job, JobDispatchMode, JobState, MLJob
from ami.jobs.tasks import process_nats_pipeline_result
from ami.jobs.tasks import _update_job_progress, process_nats_pipeline_result
from ami.main.models import Detection, Project, SourceImage, SourceImageCollection
from ami.ml.models import Algorithm, Pipeline
from ami.ml.models.algorithm import AlgorithmTaskType
Expand Down Expand Up @@ -1229,3 +1230,242 @@ def test_jobs_health_check_runs_lost_images_before_stale_jobs(self, mock_manager

job.refresh_from_db()
self.assertEqual(job.status, JobState.SUCCESS.value)


class TestConditionalTerminalTransition(TransactionTestCase):
"""Regression tests for issue #1337.

`_update_job_progress` used to write the overall `job.status` as part of the
same `save(update_fields=[..., "status", ...])` that persisted the progress
blob. Because #1261 dropped the `select_for_update` that serialized these
writes, a slower/stale worker could regress an already-terminal status back
to STARTED (or overwrite a REVOKED/cancelled job with SUCCESS). A wrongly
non-terminal status also makes the job claimable again via `/next`, starving
newer work. The fix performs the terminal transition as a guarded atomic
UPDATE that only fires from a pre-terminal status.
"""

def setUp(self):
cache.clear()
self.project = Project.objects.create(name="Terminal Transition Project")
self.pipeline = Pipeline.objects.create(name="TT Pipeline", slug="tt-pipeline")
self.pipeline.projects.add(self.project)
self.collection = SourceImageCollection.objects.create(name="TT Collection", project=self.project)
self.job = Job.objects.create(
job_type_key=MLJob.key,
project=self.project,
name="Terminal Transition Job",
pipeline=self.pipeline,
source_image_collection=self.collection,
dispatch_mode=JobDispatchMode.ASYNC_API,
status=JobState.STARTED,
)

def tearDown(self):
cache.clear()

def _complete_all_stages(self):
"""Drive an MLJob's stages (collect, process, results) to 100% SUCCESS."""
for stage in ("collect", "process", "results"):
_update_job_progress(self.job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS)

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_completion_does_not_resurrect_revoked_job(self, _mock_cleanup):
"""A completing worker must not flip a REVOKED job back to SUCCESS."""
# Bring the job to the brink of completion (collect + process done, but not
# results), so the eventual results update is the call that completes it.
_update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS)
_update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS)

# The reaper (or a cancel) wins the race and marks the job terminal.
Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED)

# A late results batch lands and computes is_complete()=True.
_update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS)

self.job.refresh_from_db()
# Status stays REVOKED — the guarded UPDATE only fires from a pre-terminal
# state, so the job is not resurrected.
self.assertEqual(self.job.status, JobState.REVOKED.value)
# The progress blob was still written, so the work is recorded as complete
# even though the status was (correctly) left terminal.
self.assertTrue(self.job.progress.is_complete())

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_completion_sets_success_from_active_state(self, _mock_cleanup):
"""A job still in an active state transitions to SUCCESS with finished_at set."""
self.assertEqual(self.job.status, JobState.STARTED.value)

self._complete_all_stages()

self.job.refresh_from_db()
self.assertEqual(self.job.status, JobState.SUCCESS.value)
self.assertIsNotNone(self.job.finished_at)

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_completion_does_not_flip_summary_status_of_revoked_job(self, _mock_cleanup):
"""A completing batch for an already-REVOKED job must not flip the
UI-facing `progress.summary.status` to SUCCESS.

Regression for the Copilot finding on PR #1338: even though the guarded
UPDATE correctly leaves the top-level `status` column at REVOKED, the
progress blob's `summary.status` was being advanced to `complete_state`
and persisted by the same save(), so the UI (which reads the JSONB)
showed SUCCESS while the job was actually REVOKED — the exact
two-sources-of-truth disagreement this PR exists to prevent.
"""
# Bring the job to the brink of completion (collect + process done).
_update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS)
_update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS)

# The reaper (or a cancel) wins the race and marks the job terminal.
# Use update_status() — the real revoke path — so BOTH the status column
# and the JSONB summary.status are set to REVOKED, exactly the state a
# cancelled/revoked job is in when a late batch arrives.
self.job.refresh_from_db()
self.job.update_status(JobState.REVOKED)

# A late results batch lands and computes is_complete()=True.
_update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS)

self.job.refresh_from_db()
# Both sources of truth must agree on the terminal state: the status
# column stays REVOKED AND the JSONB summary.status is NOT flipped to
# SUCCESS.
self.assertEqual(self.job.status, JobState.REVOKED.value)
self.assertEqual(self.job.progress.summary.status, JobState.REVOKED)
# The stage-level progress was still recorded as complete.
self.assertTrue(self.job.progress.is_complete())

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_incomplete_update_never_writes_overall_status(self, _mock_cleanup):
"""A non-terminal progress update must not touch the overall job.status."""
Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED)

# A single non-completing stage update (job has other stages still at 0%).
_update_job_progress(self.job.pk, stage="collect", progress_percentage=0.5, complete_state=JobState.SUCCESS)

self.job.refresh_from_db()
self.assertEqual(self.job.status, JobState.REVOKED.value)


class TestConcurrentStatusRace(TransactionTestCase):
"""Reproduce the lost-update status race in ``_update_job_progress`` (issue #1337).

The sequential ``TestConditionalTerminalTransition`` cases set the job
terminal *between* ``_update_job_progress`` transactions. This class forces
the actual concurrent interleave: a slow worker reads the job while it is
still ``STARTED``, another writer marks the job ``REVOKED`` and commits, and
only then does the slow worker reach its save.

On ``main`` the slow worker's ``save(update_fields=[..., "status", ...])``
writes the ``SUCCESS`` it computed from its stale snapshot, regressing the
already-committed ``REVOKED`` status (the job is resurrected). On the #1338
fix branch the terminal transition is a guarded atomic UPDATE that only fires
from a pre-terminal status, so the committed ``REVOKED`` is preserved.

The assertion is written so that green means the bug is fixed and red means
the race reproduced.
"""

def setUp(self):
cache.clear()
self.project = Project.objects.create(name="Concurrent Race Project")
self.pipeline = Pipeline.objects.create(name="CR Pipeline", slug="cr-pipeline")
self.pipeline.projects.add(self.project)
self.collection = SourceImageCollection.objects.create(name="CR Collection", project=self.project)
self.job = Job.objects.create(
job_type_key=MLJob.key,
project=self.project,
name="Concurrent Race Job",
pipeline=self.pipeline,
source_image_collection=self.collection,
dispatch_mode=JobDispatchMode.ASYNC_API,
status=JobState.STARTED,
)

def tearDown(self):
cache.clear()

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_late_results_batch_does_not_resurrect_revoked_job(self, _mock_cleanup):
from ami.jobs.tasks import _update_job_progress

# Bring the job to the brink of completion: collect + process done, but
# results still open, so the late results batch is the call that would
# complete the job and attempt the terminal transition.
_update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS)
_update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS)

# Events to coordinate the interleave. The worker thread blocks at a seam
# that runs AFTER it has read the job (with a stale STARTED snapshot) but
# BEFORE it saves; the main thread then commits REVOKED and releases it.
worker_reached_seam = threading.Event()
main_committed_revoke = threading.Event()

real_get_counts = _update_job_progress.__globals__["_get_current_counts_from_job_progress"]

def blocking_get_counts(job, stage):
# This runs inside the worker's transaction, right after it has read
# the Job row, before it computes/saves the new status. Signal that
# the worker is holding its stale snapshot, then wait for the main
# thread to commit the REVOKE before letting the worker proceed.
counts = real_get_counts(job, stage)
worker_reached_seam.set()
if not main_committed_revoke.wait(timeout=10):
raise AssertionError("main thread did not commit REVOKE within timeout")
return counts

worker_error: list[BaseException] = []

def worker():
from django.db import connection

try:
with patch(
"ami.jobs.tasks._get_current_counts_from_job_progress",
side_effect=blocking_get_counts,
):
# This is the late results batch. It reads the job (STARTED),
# blocks at the seam, then — after the main thread revokes —
# finishes and saves, computing SUCCESS from its stale read.
_update_job_progress(
self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS
)
except BaseException as exc: # noqa: BLE001 - surfaced to the main thread
worker_error.append(exc)
finally:
# Hygiene: this thread used its own DB connection; close it so the
# test's connection teardown does not trip over a dangling one.
connection.close()

worker_thread = threading.Thread(target=worker, name="late-results-worker")
worker_thread.start()

try:
# Wait until the worker is parked at the seam holding its stale read.
if not worker_reached_seam.wait(timeout=10):
raise AssertionError("worker thread never reached the seam (deadlock or early failure)")

# Another writer (the reaper or a cancel) wins the race and commits a
# terminal status while the worker is still mid-flight. The worker
# holds no row lock (the select_for_update was removed in #1261), so
# this UPDATE commits immediately.
Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED)
main_committed_revoke.set()
finally:
worker_thread.join(timeout=15)

if worker_thread.is_alive():
raise AssertionError("worker thread deadlocked and did not finish")
if worker_error:
raise worker_error[0]

self.job.refresh_from_db()
# The committed REVOKED status must survive the late worker's save. On
# main this fails: the stale read-modify-write regresses it to SUCCESS.
self.assertEqual(
self.job.status,
JobState.REVOKED.value,
f"late results batch resurrected a REVOKED job to {self.job.status!r} (lost-update race)",
)
Loading