Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions ami/jobs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1067,25 +1067,39 @@ def retry(self, async_task=True):

def cancel(self):
"""
Cancel a job. For async_api jobs, clean up NATS/Redis resources
and transition through CANCELING → REVOKED. For other jobs,
revoke the Celery task.
Cancel a job.

For ASYNC_API jobs the long-running work is on remote ADC workers via
NATS, not in the local ``run_job`` celery task — by the time the user
clicks cancel, ``run_job`` has usually already finished
``queue_images_to_nats`` and returned. Tearing down the NATS stream +
Redis state (``cleanup_async_job_if_needed``) is what actually stops
further work: ADC stops being delivered tasks, and any in-flight
result handlers see no Redis state and fast-fail. Calling
``revoke(terminate=True)`` on the (likely-done) run_job would SIGTERM
the worker child if it happens to still be inside the bootstrap (e.g.
a slow ``filter_processed_images`` for a huge collection), which
prior to ``acks_late`` was an unrecoverable message loss. We revoke
without terminate so a not-yet-started copy is dropped without
killing in-flight bootstrap; the in-flight copy then notices
``status == CANCELING`` via the early-guard in ``run_job`` next time
it's invoked (e.g. on redelivery) and bails out cleanly.

For INTERNAL / SYNC_API jobs the celery task body owns the entire
job lifecycle, so terminating it remains the only way to stop
active work.
"""
self.status = JobState.CANCELING
self.save()

is_async_api = self.dispatch_mode == JobDispatchMode.ASYNC_API
if self.task_id:
task = run_job.AsyncResult(self.task_id)
if task:
task.revoke(terminate=True)
if self.dispatch_mode == JobDispatchMode.ASYNC_API:
# For async jobs we need to set the status to revoked here since the task already
# finished (it only queues the images).
self.status = JobState.REVOKED
self.save()
else:
self.status = JobState.REVOKED
self.save()
task.revoke(terminate=not is_async_api)

self.status = JobState.REVOKED
Comment on lines 1134 to +1138
self.save()

cleanup_async_job_if_needed(self)

Expand Down
53 changes: 38 additions & 15 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,30 +136,53 @@ def update_async_services_seen_for_project(project_id: int) -> None:
)


@celery_app.task(bind=True, soft_time_limit=default_soft_time_limit, time_limit=default_time_limit)
# acks_late + reject_on_worker_lost so a worker SIGKILL/OOM mid-task does not
# silently drop the job: the broker holds the message until the task body
# either completes successfully or raises, and redelivers if the worker dies.
# Pairs with the early-guard below — a redelivered run_job that finds the job
# already in a terminal state (or mid-cancellation) returns cleanly instead of
# re-running side effects. See RolnickLab/antenna#1323.
@celery_app.task(
bind=True,
soft_time_limit=default_soft_time_limit,
time_limit=default_time_limit,
acks_late=True,
reject_on_worker_lost=True,
)
def run_job(self, job_id: int) -> None:
from ami.jobs.models import Job
from ami.jobs.models import Job, JobState

try:
job = Job.objects.get(pk=job_id)
except Job.DoesNotExist as e:
raise e
# self.retry(exc=e, countdown=1, max_retries=1)

# Early-guard: under acks_late, the broker may redeliver this message after a
# worker SIGKILL/OOM, and Job.cancel() may also flip status to CANCELING /
# REVOKED while the message sits in the prefetch buffer. Don't re-run a job
# that's already settled or being torn down.
if job.status in JobState.final_states() or job.status == JobState.CANCELING:
job.logger.info(
f"Skipping run_job for job {job.pk}: already in status {job.status} "
f"(redelivery or cancellation in flight)"
)
return
Comment on lines +161 to +172

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Entry-only cancel guard is race-prone for ASYNC_API jobs.

Lines 165-170 guard only before job.run(). If cancel happens after that check, the task can still reach async dispatch and enqueue work under a canceled job because ASYNC_API cancel no longer terminates the worker process. Add a second DB refresh/status check immediately before async dispatch (e.g., right before queue_images_to_nats) and abort when status is CANCELING/terminal.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@ami/jobs/tasks.py` around lines 161 - 170, The pre-run guard using job.status
/ JobState.final_states() is insufficient for ASYNC_API jobs because
cancellation may occur after the initial check but before dispatch; to fix, add
a second status refresh and guard immediately before the async dispatch call
(right before queue_images_to_nats) by reloading the Job from the DB (e.g., call
the model refresh/get by PK) and aborting the task (return) if the reloaded
job.status is JobState.CANCELING or in JobState.final_states(), logging a
similar skip message; ensure you reference the same job PK/logger and perform
this check right before queue_images_to_nats to avoid enqueuing work for
canceled jobs.


job.logger.info(f"Running job {job}")
try:
job.run()
except Exception as e:
job.logger.error(f'Job #{job.pk} "{job.name}" failed: {e}')
raise
else:
job.logger.info(f"Running job {job}")
try:
job.run()
except Exception as e:
job.logger.error(f'Job #{job.pk} "{job.name}" failed: {e}')
raise
else:
from ami.jobs.models import JobDispatchMode
from ami.jobs.models import JobDispatchMode

job.refresh_from_db()
if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete():
_log_worker_availability(job)
else:
job.logger.info(f"Finished job {job}")
job.refresh_from_db()
if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete():
_log_worker_availability(job)
else:
job.logger.info(f"Finished job {job}")


def _log_worker_availability(job) -> None:
Expand Down
90 changes: 86 additions & 4 deletions ami/jobs/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,92 @@ def test_run_job_unauthenticated(self):
# Accept either 401 (TokenAuthentication) or 403 (SessionAuthentication with AnonymousUser)
self.assertIn(resp.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN])

def test_cancel_job(self):
# This cannot be tested until we have a way to cancel jobs
# and a way to run async tasks in tests.
pass
def test_cancel_async_api_job_does_not_terminate_celery_task(self):
"""ASYNC_API cancel must revoke without terminate=True.

The remote ADC worker is doing the actual work via NATS — terminating
the (likely-done) local ``run_job`` bootstrap doesn't stop them, and
SIGTERM'ing a still-bootstrapping child loses the message under the
broker's early-ack default. Cleanup of NATS/Redis state is what
actually stops further work.
"""
from unittest.mock import MagicMock, patch

job = Job.objects.create(
project=self.project,
name="Cancel async_api",
task_id="fake-async-task-id",
status=JobState.STARTED,
dispatch_mode=JobDispatchMode.ASYNC_API,
)

with patch("ami.jobs.models.run_job") as mock_run_job, patch(
"ami.jobs.models.cleanup_async_job_if_needed"
) as mock_cleanup:
mock_task = MagicMock()
mock_run_job.AsyncResult.return_value = mock_task

job.cancel()

mock_run_job.AsyncResult.assert_called_once_with("fake-async-task-id")
mock_task.revoke.assert_called_once_with(terminate=False)
mock_cleanup.assert_called_once_with(job)

job.refresh_from_db()
self.assertEqual(job.status, JobState.REVOKED)

def test_cancel_sync_api_job_terminates_celery_task(self):
"""SYNC_API / INTERNAL cancel must keep terminate=True.

Their celery task body owns the entire job lifecycle, so terminating
the task is the only way to stop active work.
"""
from unittest.mock import MagicMock, patch

job = Job.objects.create(
project=self.project,
name="Cancel sync_api",
task_id="fake-sync-task-id",
status=JobState.STARTED,
dispatch_mode=JobDispatchMode.SYNC_API,
)

with patch("ami.jobs.models.run_job") as mock_run_job, patch(
"ami.jobs.models.cleanup_async_job_if_needed"
) as mock_cleanup:
mock_task = MagicMock()
mock_run_job.AsyncResult.return_value = mock_task

job.cancel()

mock_task.revoke.assert_called_once_with(terminate=True)
mock_cleanup.assert_called_once_with(job)

job.refresh_from_db()
self.assertEqual(job.status, JobState.REVOKED)

def test_cancel_job_without_task_id_still_revokes(self):
"""A job that never made it to enqueue (no task_id) still transitions
to REVOKED and triggers async-cleanup (a no-op for non-ASYNC_API)."""
from unittest.mock import patch

job = Job.objects.create(
project=self.project,
name="Cancel never-enqueued",
task_id="",
status=JobState.PENDING,
dispatch_mode=JobDispatchMode.INTERNAL,
)

with patch("ami.jobs.models.run_job") as mock_run_job, patch(
"ami.jobs.models.cleanup_async_job_if_needed"
) as mock_cleanup:
job.cancel()
mock_run_job.AsyncResult.assert_not_called()
mock_cleanup.assert_called_once_with(job)

job.refresh_from_db()
self.assertEqual(job.status, JobState.REVOKED)

def test_list_jobs_with_ids_only(self):
"""Test the ids_only parameter returns only job IDs."""
Expand Down
82 changes: 82 additions & 0 deletions ami/jobs/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,88 @@ def test_task_failure_marks_sync_api_job_failure_and_cleans_up(self, mock_cleanu
mock_cleanup.assert_called_once()


class TestRunJobEarlyGuard(TransactionTestCase):
"""
run_job early-guard regression tests.

With ``acks_late=True`` and ``reject_on_worker_lost=True`` on the task,
the broker will redeliver a run_job message if a worker dies mid-task
(SIGKILL, OOM, deploy roll). The early-guard at the top of ``run_job``
short-circuits when the Job is already in a terminal state or being
cancelled, so a redelivery — or a cancel-and-retry race — does not
re-run side effects. See RolnickLab/antenna#1323.
"""

def setUp(self):
cache.clear()
self.project = Project.objects.create(name="run_job guard project")
self.pipeline = Pipeline.objects.create(name="run_job guard pipeline", slug="run-job-guard-pipeline")
self.pipeline.projects.add(self.project)
self.collection = SourceImageCollection.objects.create(name="run_job guard collection", project=self.project)

def tearDown(self):
cache.clear()

def _make_job(self, status: JobState) -> Job:
job = Job.objects.create(
job_type_key=MLJob.key,
project=self.project,
name=f"run_job guard {status}",
pipeline=self.pipeline,
source_image_collection=self.collection,
)
job.status = status
job.save()
return job

def test_skips_when_job_already_revoked(self):
"""Redelivery after the user cancelled and the job settled to REVOKED."""
from ami.jobs.tasks import run_job

job = self._make_job(JobState.REVOKED)

with patch.object(Job, "run") as mock_run:
result = run_job.apply(args=[job.pk])

self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}")
mock_run.assert_not_called()

def test_skips_when_job_canceling(self):
"""Cancel arrived after the message was already in the prefetch buffer."""
from ami.jobs.tasks import run_job

job = self._make_job(JobState.CANCELING)

with patch.object(Job, "run") as mock_run:
result = run_job.apply(args=[job.pk])

self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}")
mock_run.assert_not_called()

def test_skips_when_job_already_success(self):
"""Redelivery after the job actually completed (e.g. ack lost in transit)."""
from ami.jobs.tasks import run_job

job = self._make_job(JobState.SUCCESS)

with patch.object(Job, "run") as mock_run:
result = run_job.apply(args=[job.pk])

self.assertTrue(result.successful(), msg=f"task should succeed, got {result.state}: {result.traceback}")
mock_run.assert_not_called()

def test_runs_when_job_pending(self):
"""Contract pair: a healthy first-delivery still calls Job.run()."""
from ami.jobs.tasks import run_job

job = self._make_job(JobState.PENDING)

with patch.object(Job, "run") as mock_run:
run_job.apply(args=[job.pk])

mock_run.assert_called_once()


class TestResultEndpointWithError(APITestCase):
"""Integration test for the result API endpoint with error results."""

Expand Down
12 changes: 12 additions & 0 deletions config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,18 @@ def _celery_result_backend_url(redis_url):
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_ENABLE_PREFETCH_COUNT_REDUCTION = True

# Fair scheduling for the prefork pool.
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#worker-pool-optimization
# Under the default scheduler the master process pre-assigns prefetched
# messages to specific prefork children at delivery time, so a long task
# pinned to one child causes head-of-line blocking even when sibling
# children are idle. Fair mode holds prefetched messages in a shared buffer
# and hands one to a child only when that child is genuinely idle, which
# matters for queues that mix heterogeneous-duration tasks (notably ``jobs``,
# where ``run_job`` can sit inside ``filter_processed_images`` for minutes).
# See RolnickLab/antenna#1323.
CELERY_WORKER_POOL_OPTIMIZATION = "fair"

# Split Celery work across three queues so one class of task can't starve
# another. Staging/production/worker compose files each run a dedicated
# worker service per queue; local/CI use a single worker consuming all queues.
Expand Down
10 changes: 10 additions & 0 deletions docker-compose.worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ services:
command: /start-celeryworker
environment:
CELERY_QUEUES: "jobs"
# TODO(#1323): consider overriding CELERY_WORKER_CONCURRENCY to ~4 here.
# Counter-intuitive but: per-container prefetch = concurrency × prefetch_multiplier(=1),
# so a 16-wide jobs container reserves up to 16 messages on a single broker channel.
# When one run_job sits idle inside filter_processed_images for minutes, the other 15
# reserved slots block sibling jobs from being offered to an idle peer container.
# Lowering to 4 shrinks the reservation window so the broker spills aggressively to
# peers. Acceptable for jobs queue because run_job spends most of its time waiting on
# NATS results rather than CPU. Other queues (ml_results, antenna) want the larger
# pool — they're DB/Redis-bound and benefit from oversubscription.
# CELERY_WORKER_CONCURRENCY: "4"
restart: always

celeryworker_ml:
Expand Down
Loading