Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,9 @@ def _update_job_progress(
progress=progress_percentage,
**state_params,
)
if job.progress.is_complete():
job.status = complete_state
became_complete = job.progress.is_complete()
if became_complete:
job.progress.summary.status = complete_state
job.finished_at = datetime.datetime.now() # Use naive datetime in local time
job.logger.info(f"Updated job {job_id} progress in stage '{stage}' to {progress_percentage*100}%")
Comment thread
mihow marked this conversation as resolved.
# Narrow the write to the fields we actually mutated. Without this, a full
# save() would overwrite `logs` and any other field on the instance
Expand All @@ -644,7 +643,33 @@ def _update_job_progress(
# JobLogHandler) could be clobbered by a stale read-modify-write.
# `updated_at` is listed explicitly because Django skips `auto_now` bumps
# when `update_fields` is provided. See PR #1261 review feedback.
job.save(update_fields=["progress", "status", "finished_at", "updated_at"])
#
# NOTE: the overall `status` and `finished_at` columns are intentionally
# NOT written here. A blob-style `save(update_fields=[..., "status", ...])`
# writes whatever `status` this transaction read at the top of the block,
# so a slower/stale worker can regress an already-terminal status (set by a
# faster worker, the stale-job reaper, or a cancel) back to STARTED — which
# also makes the job claimable again via `/next`, starving newer work. The
# terminal transition is performed separately below as a guarded, atomic
# UPDATE. See issue #1337.
job.save(update_fields=["progress", "updated_at"])
if became_complete:
# Conditional terminal transition: flip to the terminal state only from
# a pre-terminal status, as a single statement-scope UPDATE. This cannot
# be clobbered by a slower worker's stale read-modify-write, and — unlike
# the `select_for_update` removed in #1261 — it holds no
# transaction-length row lock, so it does not reintroduce that
# contention. A job a concurrent worker, the reaper, or a cancel already
# moved to a terminal or CANCELING state is left untouched (we do not
# resurrect a revoked/cancelled job into SUCCESS). See issue #1337.
now = datetime.datetime.now() # Use naive datetime in local time
transitioned = Job.objects.filter(
pk=job_id,
status__in=[JobState.CREATED, JobState.PENDING, JobState.STARTED, JobState.RETRY],
).update(status=complete_state, finished_at=now)
if transitioned:
job.status = complete_state
job.finished_at = now
try:
_log_job_throughput(job, stage)
except Exception as e:
Expand Down
84 changes: 83 additions & 1 deletion ami/jobs/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from ami.base.serializers import reverse_with_params
from ami.jobs.models import Job, JobDispatchMode, JobState, MLJob
from ami.jobs.tasks import process_nats_pipeline_result
from ami.jobs.tasks import _update_job_progress, process_nats_pipeline_result
from ami.main.models import Detection, Project, SourceImage, SourceImageCollection
from ami.ml.models import Algorithm, Pipeline
from ami.ml.models.algorithm import AlgorithmTaskType
Expand Down Expand Up @@ -1229,3 +1229,85 @@ def test_jobs_health_check_runs_lost_images_before_stale_jobs(self, mock_manager

job.refresh_from_db()
self.assertEqual(job.status, JobState.SUCCESS.value)


class TestConditionalTerminalTransition(TransactionTestCase):
"""Regression tests for issue #1337.

`_update_job_progress` used to write the overall `job.status` as part of the
same `save(update_fields=[..., "status", ...])` that persisted the progress
blob. Because #1261 dropped the `select_for_update` that serialized these
writes, a slower/stale worker could regress an already-terminal status back
to STARTED (or overwrite a REVOKED/cancelled job with SUCCESS). A wrongly
non-terminal status also makes the job claimable again via `/next`, starving
newer work. The fix performs the terminal transition as a guarded atomic
UPDATE that only fires from a pre-terminal status.
"""

def setUp(self):
cache.clear()
self.project = Project.objects.create(name="Terminal Transition Project")
self.pipeline = Pipeline.objects.create(name="TT Pipeline", slug="tt-pipeline")
self.pipeline.projects.add(self.project)
self.collection = SourceImageCollection.objects.create(name="TT Collection", project=self.project)
self.job = Job.objects.create(
job_type_key=MLJob.key,
project=self.project,
name="Terminal Transition Job",
pipeline=self.pipeline,
source_image_collection=self.collection,
dispatch_mode=JobDispatchMode.ASYNC_API,
status=JobState.STARTED,
)

def tearDown(self):
cache.clear()

def _complete_all_stages(self):
"""Drive an MLJob's stages (collect, process, results) to 100% SUCCESS."""
for stage in ("collect", "process", "results"):
_update_job_progress(self.job.pk, stage=stage, progress_percentage=1.0, complete_state=JobState.SUCCESS)

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_completion_does_not_resurrect_revoked_job(self, _mock_cleanup):
"""A completing worker must not flip a REVOKED job back to SUCCESS."""
# Bring the job to the brink of completion (collect + process done, but not
# results), so the eventual results update is the call that completes it.
_update_job_progress(self.job.pk, stage="collect", progress_percentage=1.0, complete_state=JobState.SUCCESS)
_update_job_progress(self.job.pk, stage="process", progress_percentage=1.0, complete_state=JobState.SUCCESS)

# The reaper (or a cancel) wins the race and marks the job terminal.
Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED)

# A late results batch lands and computes is_complete()=True.
_update_job_progress(self.job.pk, stage="results", progress_percentage=1.0, complete_state=JobState.SUCCESS)

self.job.refresh_from_db()
# Status stays REVOKED — the guarded UPDATE only fires from a pre-terminal
# state, so the job is not resurrected.
self.assertEqual(self.job.status, JobState.REVOKED.value)
# The progress blob was still written, so the work is recorded as complete
# even though the status was (correctly) left terminal.
self.assertTrue(self.job.progress.is_complete())

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_completion_sets_success_from_active_state(self, _mock_cleanup):
"""A job still in an active state transitions to SUCCESS with finished_at set."""
self.assertEqual(self.job.status, JobState.STARTED.value)

self._complete_all_stages()

self.job.refresh_from_db()
self.assertEqual(self.job.status, JobState.SUCCESS.value)
self.assertIsNotNone(self.job.finished_at)

@patch("ami.jobs.tasks.cleanup_async_job_if_needed")
def test_incomplete_update_never_writes_overall_status(self, _mock_cleanup):
"""A non-terminal progress update must not touch the overall job.status."""
Job.objects.filter(pk=self.job.pk).update(status=JobState.REVOKED)

# A single non-completing stage update (job has other stages still at 0%).
_update_job_progress(self.job.pk, stage="collect", progress_percentage=0.5, complete_state=JobState.SUCCESS)

self.job.refresh_from_db()
self.assertEqual(self.job.status, JobState.REVOKED.value)
Loading