From 476a71049104dcb02f33ecc063e9c4964acd451c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 15 Apr 2026 11:14:37 -0700 Subject: [PATCH 1/4] fix(jobs): ack NATS after results-stage SREM, guard task_failure for in-flight async jobs Resolves two related causes of async_api jobs landing in unrecoverable states: * Bug A (stranded STARTED): process_nats_pipeline_result acked NATS before the results-stage Redis SREM. A worker crash between those two writes drained the message from NATS (no redelivery) while Redis kept the image id in pending_images:results, leaving the job at partial progress with no path to completion. ACK now happens last, after save_results, the results-stage SREM, and _update_job_progress are all durable. * Counter-inflation on replay: AsyncJobStateManager.update_state now returns the SREM integer result via JobStateProgress.newly_removed, and the result handler gates detections/classifications/captures accumulation on it being > 0. Replays (NATS redeliver or Celery retry) pass zeros so counters stay idempotent. * Bug C (task_failure collapsing in-flight jobs): the task_failure signal unconditionally marked ASYNC_API jobs FAILURE and tore down NATS/Redis state, even when images were still being processed. Added a guard mirroring the SUCCESS guard in task_postrun: defer terminal state to the async progress handler when dispatch_mode==ASYNC_API and progress is not complete. Also reduces NATS max_deliver from 5 to 2 (one retry covers a transient blip; more burns ADC + worker time on consistently-bad results) and hoists it to a top-of-file NATS_MAX_DELIVER constant configurable via settings. Adds diagnostic logging in _update_job_progress: warns when max() lifts the passed percentage to 1.0 (catches state-race symptoms otherwise invisible) and logs which stages satisfied is_complete() at cleanup time. Tests: - test_ack_deferred_until_after_results_stage_srem: patches update_state to fail on results stage; asserts ACK is not called. - test_results_counter_does_not_inflate_on_replay: double-delivers the same result; asserts captures stays at 1. Co-Authored-By: Claude --- ami/jobs/tasks.py | 102 ++++++++++--- ami/jobs/tests/test_tasks.py | 132 +++++++++++++++++ ami/ml/orchestration/async_job_state.py | 7 + ami/ml/orchestration/nats_queue.py | 10 +- docs/claude/debugging/chaos-scenarios.md | 167 ++++++++++++++++++++++ docs/claude/processing-lifecycle.md | 173 +++++++++++++++++++++++ 6 files changed, 571 insertions(+), 20 deletions(-) create mode 100644 docs/claude/debugging/chaos-scenarios.md create mode 100644 docs/claude/processing-lifecycle.md diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index de2700593..00ddc6fac 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -176,9 +176,12 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub classifications_count = sum(len(detection.classifications) for detection in pipeline_result.detections) captures_count = len(pipeline_result.source_images) - acked = _ack_task_via_nats(reply_subject, job.logger) - # Update job stage with calculated progress - + # Do NOT ack NATS yet. ACK must happen AFTER the results-stage SREM and + # _update_job_progress so that a worker crash between save_results and + # progress commit leaves the message redeliverable. Previously the ACK + # ran here (before SREM): on crash, NATS drained permanently while + # Redis pending_images:results kept the id, stranding the job at + # partial progress with no path to completion. See antenna#1232. try: progress_info = state_manager.update_state( processed_image_ids, @@ -187,12 +190,9 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub except RedisError as e: # Transient. save_results dedupes on re-run (get_or_create_detection) # and SREM is a no-op on already-removed ids, so a Celery retry is - # safe for the DB and Redis sets. The caveat is _update_job_progress - # accumulates detections/classifications/captures on the results - # stage (see _update_job_progress stage=="results" branch); if this - # retry runs a second time (or NATS redelivers to ADC because - # ack_wait elapsed before we got here), those counters will inflate - # cosmetically. Tracked in #1232. + # safe for the DB and Redis sets. Counter accumulation is gated on + # progress_info.newly_removed below, so replays will not inflate + # detections/classifications/captures (fixes antenna#1232 replay case). job.logger.warning( f"Transient Redis error updating job {job_id} state (stage=results); Celery will retry: {e}", exc_info=True, @@ -200,6 +200,10 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub raise if not progress_info: + # State keys genuinely missing (total-images key returned None). Ack + # first so NATS stops redelivering a message whose state is gone, + # then fail the job. Mirrors the stage=process missing-state path. + _ack_task_via_nats(reply_subject, job.logger) _fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)") return @@ -208,15 +212,40 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub if progress_info.total > 0 and (progress_info.failed / progress_info.total) > FAILURE_THRESHOLD: complete_state = JobState.FAILURE - _update_job_progress( - job_id, - "results", - progress_info.percentage, - complete_state=complete_state, - detections=detections_count, - classifications=classifications_count, - captures=captures_count, - ) + # Counter-inflation guard: only add detection/classification/capture counts + # when SREM actually removed ids (first processing of this result). On a + # replay (NATS redelivered the message or the Celery task retried past + # the SREM), newly_removed==0 and we skip accumulation to keep the + # counters idempotent. The percentage/status path still runs because + # _update_job_progress uses max() and preserves FAILURE regardless. + if progress_info.newly_removed > 0: + _update_job_progress( + job_id, + "results", + progress_info.percentage, + complete_state=complete_state, + detections=detections_count, + classifications=classifications_count, + captures=captures_count, + ) + else: + _update_job_progress( + job_id, + "results", + progress_info.percentage, + complete_state=complete_state, + detections=0, + classifications=0, + captures=0, + ) + + # Ack LAST — only after the results-stage SREM and progress commit are + # durable. If anything above crashes, NATS will redeliver the message + # and the full result path re-runs idempotently: save_results dedupes + # on (detection, source_image), SREM is a no-op on already-removed ids + # (newly_removed==0 gates counter accumulation), and the progress + # percentage is clamped by max() to never regress. + acked = _ack_task_via_nats(reply_subject, job.logger) except RedisError: # Logged above at the specific update_state call site; re-raise so @@ -337,8 +366,11 @@ def _update_job_progress( # Don't overwrite a stage with a stale progress value. # This guards against the race where a slower worker calls _update_job_progress # after a faster worker has already marked further progress. + passed_progress = progress_percentage + existing_progress: float | None = None try: existing_stage = job.progress.get_stage(stage) + existing_progress = existing_stage.progress progress_percentage = max(existing_stage.progress, progress_percentage) # Explicitly preserve FAILURE: once a stage is marked FAILURE it should # never regress to a non-failure state, regardless of enum ordering. @@ -347,6 +379,17 @@ def _update_job_progress( except (ValueError, AttributeError): pass # Stage doesn't exist yet; proceed normally + # Diagnostic: when max() lifts the percentage to 1.0 from a partial value + # this worker computed, surface it. A legitimate jump means another + # worker concurrently completed the stage; an unexpected jump (e.g. the + # premature-cleanup pattern from antenna#????) is otherwise invisible. + if existing_progress is not None and progress_percentage >= 1.0 and passed_progress < 1.0: + job.logger.warning( + f"Stage '{stage}' progress lifted to 100% by max() guard: " + f"this worker passed {passed_progress*100:.1f}%, DB had {existing_progress*100:.1f}%. " + f"If no other worker just legitimately finished this stage, this is a state-race symptom." + ) + # Determine the status to write: # - Stage complete (100%): use complete_state (SUCCESS or FAILURE) # - Stage incomplete but FAILURE already determined: keep FAILURE visible @@ -374,6 +417,11 @@ def _update_job_progress( # Clean up async resources for completed jobs that use NATS/Redis if job.progress.is_complete(): job = Job.objects.get(pk=job_id) # Re-fetch outside transaction + # Diagnostic: log which stages satisfied the complete condition. Without + # this, premature-cleanup bugs (cleanup fires while results are still + # mid-flight) are hard to trace back to a specific stage transition. + stages_summary = ", ".join(f"{s.key}={s.progress*100:.1f}% {s.status}" for s in job.progress.stages) + job.logger.info(f"is_complete()=True after stage='{stage}' update; firing cleanup. Stages: {stages_summary}") cleanup_async_job_if_needed(job) @@ -659,9 +707,25 @@ def update_job_status(sender, task_id, task, state: str, retval=None, **kwargs): @task_failure.connect(sender=run_job, retry=False) def update_job_failure(sender, task_id, exception, *args, **kwargs): - from ami.jobs.models import Job, JobState + from ami.jobs.models import Job, JobDispatchMode, JobState job = Job.objects.get(task_id=task_id) + + # For ASYNC_API jobs where images have been queued to NATS but the final + # stages have not completed, a run_job failure (e.g. a transient exception + # raised *after* queue_images_to_nats returned) would otherwise collapse an + # otherwise-healthy async job: NATS workers are still processing, results + # are still arriving, but this handler would mark FAILURE and cleanup would + # destroy the stream/consumer + Redis state mid-flight. Defer terminal + # state to the async result handler, which owns is_complete() transitions. + # Mirrors the SUCCESS guard in update_job_status (task_postrun). + if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete(): + job.logger.warning( + f'Job #{job.pk} "{job.name}" run_job raised but async processing is in-flight; ' + f"deferring FAILURE to async progress handler: {exception}" + ) + return + job.update_status(JobState.FAILURE, save=False) job.logger.error(f'Job #{job.pk} "{job.name}" failed: {exception}') diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 0e8f476b4..8e05c7243 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -351,6 +351,138 @@ def test_genuinely_missing_state_acks_and_fails_job(self, mock_manager_class, mo args, _ = mock_fail.call_args self.assertIn("Job state keys not found in Redis", args[1]) + @patch("ami.jobs.tasks._ack_task_via_nats") + @patch("ami.jobs.tasks.TaskQueueManager") + def test_ack_deferred_until_after_results_stage_srem(self, mock_manager_class, mock_ack): + """ + Bug A regression: NATS ACK must NOT happen until after the results-stage + SREM is durable in Redis. A worker crash between save_results and the + results SREM would otherwise strand the image in pending_images:results + with NATS already drained (no redelivery) — the job's results stage + never reaches 100% and no code path reconciles it. + + This test simulates a crash on the results-stage SREM. Correct behavior: + - process-stage SREM succeeded (called first, no crash) + - save_results ran + - results-stage SREM raised RedisError → exception propagates to Celery + - ACK was NOT called (so NATS will redeliver after ack_wait) + + On buggy code (ACK before results SREM), mock_ack would be called before + the raise, leaving the id stranded in Redis. + """ + from redis.exceptions import RedisError + + self._setup_mock_nats(mock_manager_class) + + # save_results requires the pipeline to have at least one detection + # algorithm. Attach a minimal one so we exercise the full save_results + # path before hitting the results-stage SREM we're testing. + detection_algorithm = Algorithm.objects.create( + name="ack-ordering-detector", + key="ack-ordering-detector", + task_type=AlgorithmTaskType.LOCALIZATION, + ) + self.pipeline.algorithms.add(detection_algorithm) + + # Use a success result (not an error) so save_results path runs fully. + # An empty detections list keeps save_results cheap. + success_data = PipelineResultsResponse( + pipeline="test-pipeline", + algorithms={}, + total_time=1.0, + source_images=[SourceImageResponse(id=str(self.images[0].pk), url="http://example.com/test_image_0.jpg")], + detections=[], + errors=None, + ).dict() + + real_update_state = AsyncJobStateManager.update_state + + def fail_on_results_stage(self, processed_image_ids, stage, failed_image_ids=None): + if stage == "results": + raise RedisError("connection reset on results SREM") + return real_update_state(self, processed_image_ids, stage, failed_image_ids) + + with patch.object(AsyncJobStateManager, "update_state", fail_on_results_stage): + with self.assertRaises(RedisError): + process_nats_pipeline_result( + job_id=self.job.pk, + result_data=success_data, + reply_subject="reply.ack-ordering", + ) + + mock_ack.assert_not_called() + + # Process stage SREM ran and removed the id; results stage still holds it, + # waiting for a successful retry or NATS redelivery. + process_progress = AsyncJobStateManager(self.job.pk).get_progress("process") + results_progress = AsyncJobStateManager(self.job.pk).get_progress("results") + self.assertEqual(process_progress.processed, 1) + self.assertEqual(results_progress.processed, 0) + + @patch("ami.jobs.tasks.TaskQueueManager") + def test_results_counter_does_not_inflate_on_replay(self, mock_manager_class): + """ + Bug A companion (antenna#1232): _update_job_progress("results") accumulates + detections/classifications/captures by reading existing values and adding + new ones — not idempotent. On a NATS redelivery or Celery retry, the same + batch can legitimately arrive twice. The fix gates accumulation on + update_state's newly_removed (SREM's integer return, 0 on replay). + + Scenario: deliver the same result twice. Counters should reflect one + batch, not two. + """ + self._setup_mock_nats(mock_manager_class) + + detection_algorithm = Algorithm.objects.create( + name="replay-detector", + key="replay-detector", + task_type=AlgorithmTaskType.LOCALIZATION, + ) + self.pipeline.algorithms.add(detection_algorithm) + + # Empty-detections success keeps save_results cheap; the counter + # accumulation still runs because captures_count = len(source_images) = 1. + success_data = PipelineResultsResponse( + pipeline="test-pipeline", + algorithms={}, + total_time=1.0, + source_images=[SourceImageResponse(id=str(self.images[0].pk), url="http://example.com/test_image_0.jpg")], + detections=[], + errors=None, + ).dict() + + # First delivery: counters should advance by 1 capture. + process_nats_pipeline_result.apply( + kwargs={"job_id": self.job.pk, "result_data": success_data, "reply_subject": "reply.first"} + ) + + self.job.refresh_from_db() + results_stage = next(s for s in self.job.progress.stages if s.key == "results") + captures_after_first = next( + (p.value for p in results_stage.params if p.key == "captures"), + 0, + ) + self.assertEqual(captures_after_first, 1, "first delivery should count 1 capture") + + # Second delivery of the same result (NATS redeliver / Celery retry after + # the results SREM was already durable). SREM now returns 0 (id already + # gone). Counters must NOT double. + process_nats_pipeline_result.apply( + kwargs={"job_id": self.job.pk, "result_data": success_data, "reply_subject": "reply.replay"} + ) + + self.job.refresh_from_db() + results_stage = next(s for s in self.job.progress.stages if s.key == "results") + captures_after_replay = next( + (p.value for p in results_stage.params if p.key == "captures"), + 0, + ) + self.assertEqual( + captures_after_replay, + 1, + f"replay must not inflate captures counter (got {captures_after_replay}, expected 1)", + ) + @patch("ami.jobs.tasks.TaskQueueManager") def test_process_nats_pipeline_result_error_job_not_found(self, mock_manager_class): """ diff --git a/ami/ml/orchestration/async_job_state.py b/ami/ml/orchestration/async_job_state.py index 6459822d5..77efa1d25 100644 --- a/ami/ml/orchestration/async_job_state.py +++ b/ami/ml/orchestration/async_job_state.py @@ -56,6 +56,7 @@ class JobStateProgress: processed: int = 0 # source images completed (success + failed) percentage: float = 0.0 # processed / total failed: int = 0 # source images that returned an error from the processing service + newly_removed: int = 0 # number of IDs actually removed by this SREM call (0 on replay) class AsyncJobStateManager: @@ -156,6 +157,11 @@ def update_state( # regardless of whether SREM/SADD appear at the front. remaining, failed_count, total_raw = results[-3], results[-2], results[-1] + # SREM's integer return (number of members actually removed) is at results[0] + # when processed_image_ids is non-empty. Zero on a replay because the IDs are + # no longer in the set. Used by callers to gate idempotent counter accumulation. + newly_removed = results[0] if processed_image_ids else 0 + if total_raw is None: return None @@ -173,6 +179,7 @@ def update_state( processed=processed, percentage=percentage, failed=failed_count, + newly_removed=newly_removed, ) def get_progress(self, stage: str) -> "JobStateProgress | None": diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index 767298af4..2270f7360 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -44,6 +44,14 @@ async def get_connection(nats_url: str) -> tuple[nats.NATS, JetStreamContext]: TASK_TTR = getattr(settings, "NATS_TASK_TTR", 30) # Visibility timeout in seconds (configurable) + +# Max delivery attempts per NATS message (1 original + N-1 retries). +# A processing service that consistently fails (e.g. returns results referencing +# an algorithm that the pipeline doesn't declare) will burn ADC + worker time on +# every retry; one retry covers a transient blip and is the right tradeoff. +# Hoist to settings (NATS_MAX_DELIVER) when we need per-environment tuning. +NATS_MAX_DELIVER = getattr(settings, "NATS_MAX_DELIVER", 2) + ADVISORY_STREAM_NAME = "advisories" # Shared stream for max delivery advisories across all jobs @@ -342,7 +350,7 @@ async def _ensure_consumer(self, job_id: int): durable_name=consumer_name, ack_policy=AckPolicy.EXPLICIT, ack_wait=TASK_TTR, # Visibility timeout (TTR) - max_deliver=5, # Max retry attempts + max_deliver=NATS_MAX_DELIVER, deliver_policy=DeliverPolicy.ALL, max_ack_pending=self.max_ack_pending, filter_subject=subject, diff --git a/docs/claude/debugging/chaos-scenarios.md b/docs/claude/debugging/chaos-scenarios.md new file mode 100644 index 000000000..73df7a0b6 --- /dev/null +++ b/docs/claude/debugging/chaos-scenarios.md @@ -0,0 +1,167 @@ +# Chaos scenarios for async_api jobs + +Manual fault-injection runbook for validating the `process_nats_pipeline_result` +retry path, the ACK/SREM ordering, and the terminal-vs-transient Redis error +boundary. Unit tests in `ami/jobs/tests/test_tasks.py` cover the task body, but +they do not exercise `autoretry_for`, real Celery backoff, or NATS redelivery — +this runbook does. + +Run against a live local stack. Do not run against production. + +## Prereqs + +- `docker compose ps` — all services healthy (django, celeryworker, redis, nats, rabbitmq). +- ADC worker running a pipeline registered on the target project. Current + verified combo: project 20, pipeline `mothbot_insect_orders_2025`. +- A fresh `SourceImageCollection` with enough images that process stage takes + >10s — otherwise there's no window to inject the fault mid-flight. +- `git status` clean. Fault-injection patches must be reverted before commit. + +## Fault-injection primitives + +### 1. `chaos_monkey` management command + +Wipes runtime state: + +``` +docker compose exec django python manage.py chaos_monkey flush redis +docker compose exec django python manage.py chaos_monkey flush nats +``` + +- `flush redis` → FLUSHDB on the default db. Every in-flight `update_state` + call will see `total_raw=None` and return `None` from that point on → the + caller takes the terminal "keys genuinely gone" path (ACK + `_fail_job`). +- `flush nats` → deletes every JetStream stream. Workers mid-pull see + `NotFoundError`. Existing Redis state is untouched. + +### 2. One-shot transient RedisError via sentinel file + +To simulate a connection reset or timeout without actually killing Redis: + +1. Patch `AsyncJobStateManager.update_state` at the top of the method body: + + ```python + import os + if os.path.exists("/tmp/inject-redis-fault"): + os.remove("/tmp/inject-redis-fault") + raise RedisError("injected transient fault") + ``` + +2. Arm: `docker compose exec celeryworker touch /tmp/inject-redis-fault`. +3. The file auto-removes on the first task hit, so exactly one invocation + sees the fault; Celery's `autoretry_for=(RedisError, ...)` retries and the + retry succeeds. +4. **Revert the patch and restart celeryworker** before committing. + +Do NOT use `redis-cli CLIENT KILL TYPE normal` for this — django-redis's +connection pool transparently reconnects and the error never reaches the +caller. + +## Scenarios + +Run these in order. Each one verifies a distinct path in Fix 1's reordered +ACK/SREM code. + +### Scenario A: happy path + +Baseline. Confirms the lifecycle doc's Section 1 matches reality. + +1. Dispatch a job: + ``` + docker compose exec -T django python manage.py test_ml_job_e2e \ + --project 20 --collection --pipeline --dispatch-mode async_api + ``` +2. Watch logs: + ``` + docker compose logs celeryworker --since 10s --follow 2>&1 | \ + grep --line-buffered -E \ + "Pending images from Redis|Updated job .* progress|Finalizing NATS consumer|ERROR|FAILURE" + ``` +3. Expected: all stages hit 100% SUCCESS; `Finalizing NATS consumer` appears + exactly once per worker; no ERROR or FAILURE lines. + +### Scenario B: transient RedisError mid-flight (ack_wait holds the message) + +Verifies `autoretry_for` is what retries — not a Celery swallowing the error. + +1. Patch `update_state` with the sentinel-file block above. +2. Restart celeryworker: `docker compose restart celeryworker`. +3. Dispatch a job, wait for process stage to pass 10%. +4. Arm: `docker compose exec celeryworker touch /tmp/inject-redis-fault`. +5. Expected in logs: + - One `Transient Redis error updating job ... state (stage=...)` warning. + - Celery `retry: Retry in N.Ns` line. + - Next invocation succeeds; stage progress resumes. + - NO `Job state keys not found in Redis` (that is the terminal path). +6. Job completes SUCCESS. +7. Revert the patch; `docker compose restart celeryworker`; `git diff` clean. + +### Scenario C: genuine Redis state loss (FLUSHDB mid-flight) + +Verifies the terminal path: update_state returns None → ACK → `_fail_job`. + +1. Dispatch a job, wait for process stage >10%. +2. `docker compose exec django python manage.py chaos_monkey flush redis`. +3. Expected in logs: + - `Pending images from Redis for job X ...` stops emitting for this job. + - `Job state keys not found in Redis (likely cleaned up concurrently)` + appears. + - `Changing status of job ` to FAILURE. + - NATS consumer finalized once (not per-worker — the remaining workers + see no state to reconcile). +4. Job row: `status=FAILURE`, finished_at set. + +### Scenario D: ACK/SREM ordering (Bug A crash window) + +Verifies Fix 1's reorder: a crash between save_results and results-SREM +leaves the message redeliverable, not stranded. + +1. Patch `process_nats_pipeline_result` to `os._exit(1)` between + `state_manager.update_state(stage="results")` and `_ack_task_via_nats(...)`. + (Pick a deterministic trigger — e.g., check for `/tmp/crash-after-srem`.) +2. Restart celeryworker. +3. Dispatch a job. Arm with `touch /tmp/crash-after-srem`. +4. Expected: + - First worker to hit the trigger dies without ACKing. + - NATS `ack_wait` (30s) elapses. + - Message redelivered to another worker. + - save_results dedupes, SREM is a no-op (`newly_removed=0`), + counter accumulation skipped. + - Job eventually completes SUCCESS; counters match image count exactly. +5. On pre-Fix-1 code, this scenario strands the image: ACK fires before the + planned crash point, so NATS has no record to redeliver; Redis keeps the + id in `pending_images:results` forever. +6. Revert the patch; restart; `git diff` clean. + +### Scenario E: Celery retries exhausted (max_retries=5) + +Verifies the job flips to FAILURE cleanly after budget exhaustion, not +stranded at partial progress. + +1. Patch `update_state` to unconditionally raise `RedisError("persistent")`. +2. Restart celeryworker. +3. Dispatch a job. +4. Expected: + - 5× `Transient Redis error` warnings with exponential backoff + (1s, 2s, 4s, 8s, 15s capped by `retry_backoff_max`). + - `MaxRetriesExceededError` in Celery logs. + - `task_failure` signal fires, but the Bug C guard defers: the job stays + STARTED because `progress.is_complete()` is False and dispatch_mode + is ASYNC_API. A stale-job reaper (Fix 2, out of scope here) would + eventually revoke it. +5. Revert; restart; `git diff` clean. + +## Gotchas + +- **celeryworker startup noise**: first ~60s after restart, the + `check_processing_services_online` beat task monopolises + `ForkPoolWorker-16` retrying unreachable services. Wait for that to settle + before dispatching. +- **RabbitMQ stale connection**: if Django has been up >1 day, AMQP + connections go stale → `ConnectionResetError: [Errno 104]`. Fix: + `docker compose restart django` before dispatching. +- **Volume mount**: `ami/` is live-mounted. Patches take effect only after + `docker compose restart celeryworker`. +- **Uncommitted patch leak**: always `git diff` before committing. The + sentinel-file pattern is disruptive; losing the revert turns every + subsequent test into a fault-injection run. diff --git a/docs/claude/processing-lifecycle.md b/docs/claude/processing-lifecycle.md new file mode 100644 index 000000000..ae57920e9 --- /dev/null +++ b/docs/claude/processing-lifecycle.md @@ -0,0 +1,173 @@ +# ASYNC_API job processing lifecycle + +Covers `dispatch_mode=ASYNC_API` only. Sync/Celery-only dispatch has a different +shape and isn't documented here. + +This doc exists so the next triage session can answer *"where would this +hang?"* without re-deriving the architecture from `models.py` → +`tasks.py` → `orchestration/*.py` → `nats_queue.py`. Read Section 2 first +when triaging a stuck job — the invariants table points at the class of bug. + +## 1. Happy-path flow + +``` +[user] POST /api/v2/jobs/{id}/run?start_now=true + └─> Job.enqueue() ami/jobs/models.py:904 + └─> run_job.apply_async(...) [Celery task enqueued] + +[celeryworker] run_job(job_id) ami/jobs/tasks.py:30 + └─> MLJob.run(job) ami/jobs/models.py:432 + ├─> collect: STARTED → 0 Job.progress.stages[collect] + ├─> pipeline.collect_images(...) [iterable resolved] + ├─> collect: SUCCESS → 1.0 Job.progress.stages[collect] + └─> queue_images_to_nats(job, images) ami/ml/orchestration/jobs.py:75 + ├─> AsyncJobStateManager.initialize_job(image_ids) ami/ml/orchestration/async_job_state.py:85 + │ [Redis: SET job:X:pending_images_total N ex=7d] + │ [Redis: SADD job:X:pending_images:process *ids ex=7d] + │ [Redis: SADD job:X:pending_images:results *ids ex=7d] + └─> TaskQueueManager.publish_task(...) × N ami/ml/orchestration/nats_queue.py:359 + [NATS: stream ami-jobs-X, consumer job-X-consumer, N messages] + + [run_job returns — Celery task_postrun fires] + └─> update_job_status(state=SUCCESS) ami/jobs/tasks.py:627 + └─> guard: progress.is_complete() == False (process/results @ 0%) + → defers SUCCESS transition to the async progress handler + +[adc/gpu worker] pulls NATS message + └─> processes image, POSTs to /api/v2/jobs/{id}/result + └─> endpoint queues process_nats_pipeline_result(job_id, result_data, reply_subject) + +[celeryworker] process_nats_pipeline_result(...) ami/jobs/tasks.py:69 + ├─> state_manager.update_state(stage="process", ids) [Redis: SREM pending:process] + ├─> _update_job_progress("process", percentage, ...) [Job.progress.stages[process]] + ├─> pipeline.save_results(results, job_id) [DB: Detections + Classifications] + ├─> _ack_task_via_nats(reply_subject) ◄─── CURRENT POSITION ami/jobs/tasks.py:179 + ├─> state_manager.update_state(stage="results", ids) [Redis: SREM pending:results] + └─> _update_job_progress("results", percentage, ...) [Job.progress.stages[results]] + └─> if job.progress.is_complete(): + └─> cleanup_async_job_if_needed(job) ami/jobs/tasks.py:602 + └─> AsyncJobStateManager.cleanup() [Redis: DEL job:X:*] + └─> TaskQueueManager.cleanup_job_resources(...) [NATS: del consumer, del stream] +``` + +**The bug that Fix 1 addresses:** the ACK at `tasks.py:179` happens *before* +the results-stage SREM at `tasks.py:183`. A worker crash between those two +lines leaves NATS drained (message already acked, no redelivery) and Redis +`pending_images:results` permanently holding that image ID. The job can +never reach 100% on the results stage. No code path reconciles this — the +15-minute snapshot check logs state but does not transition the job. +Fix 1 moves the ACK to *after* the results-stage SREM + `_update_job_progress`. + +## 2. State invariants + +At any moment during a healthy async_api job, these must hold. One-line +checks for each — run them against a job_id that's suspected stuck. + +| Invariant | One-line check | +|---|---| +| If `Job.status==STARTED` and async_api, either NATS has work (num_pending+num_ack_pending>0) or Redis is empty + progress is 100% | `redis-cli -n 1 SCARD job:{id}:pending_images:results`; then see §5 for the NATS half | +| `Redis SCARD pending:results` ≤ `NATS delivered - ack_floor` at rest | if SCARD>0 but NATS shows everything acked, that's Bug A (Fix 1 territory) | +| `job.progress.stages` contains `collect`, `process`, `results` before `run_job` exits | stages are initialized in `Job.setup()`, not lazily — see `ami/jobs/models.py:944-955` | +| Cleanup only fires when `Job.status in final_states` OR `progress.is_complete() == True` | grep log for `Finalizing NATS consumer` — timestamp must be ≥ all `_update_job_progress` timestamps | +| `is_complete()` returns True iff every stage has `progress>=1.0 AND status in final_states` | `ami/jobs/models.py:245-267` — works off an exhaustive stage list | +| `_update_job_progress` counter-accumulator on `results` stage runs *only when this batch's SREM newly removed IDs* | after Fix 1: inspect `newly_processed` gate. Before Fix 1: inflation on retry is possible (tracked in antenna#1232) | + +If any invariant is violated, the failure mode is probably below. + +## 3. Failure modes + +| Symptom | Likely cause | Diagnostic | Fix direction | +|---|---|---|---| +| Job STARTED forever; NATS drained; Redis `pending:results` > 0 | Worker crashed between ACK and results-stage SREM (Bug A) | `redis-cli -n 1 SCARD job:{id}:pending_images:results` > 0 AND NATS `num_pending+num_ack_pending == 0` | **Fix 1** (in-flight). Ship and backfill 15-min reconciler (Fix 2). | +| Job FAILURE within 30-60s of dispatch; cleanup fired mid-processing | Premature `cleanup_async_job_resources` — `is_complete()` momentarily True (Bug B, not yet reproduced) | grep log `Finalizing NATS consumer` for job, compare timestamp to `Finished job` (run_job exit) and first `Updated job ... progress` line | Separate issue (see drafts in ami-devops). Not in scope for Fix 1. | +| Transient `run_job` exception flips job to FAILURE even though 100+ images were successfully queued | `task_failure` signal missing ASYNC_API guard (Bug C) | grep log for `task_failure` on `run_job` + Job row status=FAILURE + Redis still has pending IDs | Separate issue. Add `not job.progress.is_complete() and dispatch_mode==ASYNC_API` guard like `task_postrun` has at `tasks.py:647`. | +| `Job state keys not found in Redis` log line | Either genuine cleanup race, or transient Redis error being misreported | If paired with autoretry-backoff log lines, it's transient (normal); if one-shot, check if cleanup fired earlier for this job_id | Already fixed in #1219/#1231 (transient path now autoretries + logs distinctly) | +| Batch processing crashes with OOM on the GPU worker | DataLoader leak (unrelated to antenna) | `dmesg -T \| grep -i oom` on ADC host | Mitigated with `AMI_NUM_WORKERS=1` in ADC config | +| Broker "Connection reset by peer" hourly on celeryworker | TCP keepalive not applied in deployment | `cat /proc/sys/net/ipv4/tcp_keepalive_time` in the container | `apply_keepalive_fix.sh` in ami-devops | +| 15-min `NATS consumer status` log lines stop appearing mid-job | Consumer was deleted (cleanup already fired); snapshot silently no-ops on missing consumer | grep log for `Deleted NATS consumer` before the gap | Symptom of Bug A or Bug B above — find the root cause | + +## 4. Call-site reference + +Non-obvious places that touch lifecycle state. File:line shown; don't quote code. + +**Cleanup triggers — when and what state it sees:** +- `_update_job_progress` at `ami/jobs/tasks.py:375` — fires `cleanup_async_job_if_needed(job)` when `is_complete()` returns True. Runs after the DB transaction commits. State seen: final stage progress in Job.progress. Bug B would be here if `is_complete()` returns True on a transient view of the stages. +- `update_job_status` (task_postrun) at `ami/jobs/tasks.py:656` — fires cleanup only on `state == REVOKED`. SUCCESS is deferred via the `is_complete()` guard at line 647. +- `update_job_failure` (task_failure) at `ami/jobs/tasks.py:672` — always calls `cleanup_async_job_if_needed` for ANY run_job failure. Bug C: this destroys NATS/Redis state even if 100+ images were successfully queued and are mid-flight. +- `check_stale_jobs` at `ami/jobs/tasks.py:461` — fires cleanup on every stale job after `FAILED_CUTOFF_HOURS=72` whether it was marked REVOKED or updated-from-celery. +- `_fail_job` at `ami/jobs/tasks.py:248` — fires cleanup after marking job FAILURE. +- `MLJob.run` at `ami/jobs/models.py:488` — fires cleanup on the zero-images path when is_complete is true immediately after `queue_images_to_nats`. + +**`_fail_job` call sites — when the terminal path is taken:** +- `process_nats_pipeline_result` at `tasks.py:125` — Redis `update_state` for process returned None (keys genuinely missing). +- `process_nats_pipeline_result` at `tasks.py:203` — same but for results stage. + +**`_update_job_progress` — stage params and accumulator:** +- `tasks.py:314` — `results` stage branch accumulates detections/classifications/captures by READING current Job.progress and ADDING. Not idempotent on replay. Fix 1 adds a `newly_processed` gate. +- `max()` guard at `tasks.py:342` — prevents progress regression when a slower worker lands after a faster one. + +**`is_complete()` — single source of truth at `models.py:245-267`:** +- Returns False if `self.stages` is empty (sanity check). +- Returns True only when EVERY stage has `progress>=1.0 AND status in final_states()`. +- `Job.setup()` at `models.py:923` initializes the full stage list before the run, so `is_complete()` at runtime sees an exhaustive list (not a partial one). This is what makes Bug B puzzling — the obvious "lazy stage" hypothesis doesn't fit. + +**`state_manager.update_state` — Redis pipeline structure at `async_job_state.py:111`:** +- Always returns a `JobStateProgress` dataclass or `None`. +- `None` means the total-images key is gone (job expired, cleaned up concurrently, or never initialized). This is a terminal signal. +- Raises `RedisError` on transient (connection reset, timeout). Callers must let autoretry_for handle this — swallowing it conflates transient with terminal (see #1219). +- After Fix 1: returns `newly_removed` (SREM's integer return) so `_update_job_progress("results")` can gate counter accumulation. + +**`state_manager.cleanup` at `async_job_state.py:208` — idempotent:** +- `DEL` on a non-existent key is a no-op. Safe to call multiple times for the same job. + +## 5. Full trace for a single job + +Copy-paste block. Replace `JOB_ID` and run inside the django container (or +swap `docker compose exec` for your deployment's equivalent). + +```bash +JOB_ID=2411 + +# Job row state (status, progress, stages, task_id, timestamps) +docker compose exec django python manage.py shell -c " +from ami.jobs.models import Job +import json +j = Job.objects.get(pk=$JOB_ID) +print(f'status={j.status} task_id={j.task_id} dispatch={j.dispatch_mode}') +print(f'started_at={j.started_at} finished_at={j.finished_at} updated_at={j.updated_at}') +print(f'progress.summary={j.progress.summary}') +for s in j.progress.stages: + print(f' stage={s.key} progress={s.progress:.2%} status={s.status}') +print('errors:', j.progress.errors[-3:] if j.progress.errors else []) +" + +# Redis state for this job +docker compose exec redis redis-cli -n 1 keys "job:$JOB_ID:*" +docker compose exec redis redis-cli -n 1 get "job:$JOB_ID:pending_images_total" +docker compose exec redis redis-cli -n 1 scard "job:$JOB_ID:pending_images:process" +docker compose exec redis redis-cli -n 1 scard "job:$JOB_ID:pending_images:results" +docker compose exec redis redis-cli -n 1 scard "job:$JOB_ID:failed_images" + +# NATS stream + consumer state (reads through the antenna client for auth/URL) +docker compose exec django python manage.py shell -c " +from ami.ml.orchestration.nats_queue import TaskQueueManager +from asgiref.sync import async_to_sync +async def _(): + async with TaskQueueManager() as m: + await m.log_consumer_stats_snapshot($JOB_ID) +async_to_sync(_)() +" 2>&1 | tail -5 + +# Last 50 log lines referencing this job +docker compose logs celeryworker --tail=2000 2>&1 | grep -E "job[^a-z]+$JOB_ID|'$JOB_ID'|#$JOB_ID" | tail -50 +``` + +**Reading the output:** +- If `status=STARTED` and both SCARDs are 0 → job should be marked done. If NATS is also drained, the 15-min snapshot will notice but won't transition (Fix 2 territory). +- If `status=STARTED` and `pending_images:results > 0` but NATS shows `num_pending=0 num_ack_pending=0` → Bug A. Redis holds image IDs that NATS already acked. Stuck. +- If `status=FAILURE` and `finished_at - started_at < 60s` on a job with >50 images → Bug B (premature cleanup) or Bug C (task_failure without guard). Compare `Finalizing NATS consumer` timestamp to the dispatch timestamp to tell which. + +--- + +Last updated: 2026-04-15 (Fix 1 branch). When Fix 1 lands, update Section 1 +to reflect the new ACK position and delete the "CURRENT POSITION" annotation. From 976f37b8f25274ddb14df4fe6d9ecb3799ff0ca9 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 15 Apr 2026 11:34:39 -0700 Subject: [PATCH 2/4] docs(lifecycle): sync processing-lifecycle + chaos runbook with Fix 1 code Copilot and CodeRabbit flagged that processing-lifecycle.md still described pre-Fix-1 state: flow diagram showed ACK before the results-stage SREM, invariants table referenced a nonexistent 'newly_processed' gate, Bug C row said "not in scope" despite the guard being in this PR, and update_job_failure was described as "always" calling cleanup. Also reworded chaos-scenarios.md Scenario D so the prose matches the crash-injection point in the procedure. Also: - Remove 'antenna#????' placeholder from the max()-lift diagnostic comment in _update_job_progress; point at the lifecycle doc's "Bug B" section instead. - Reword the NATS_MAX_DELIVER comment in nats_queue.py so it does not read as a TODO when the settings override is already wired up on the next line. Co-Authored-By: Claude --- ami/jobs/tasks.py | 10 +++- ami/ml/orchestration/nats_queue.py | 3 +- docs/claude/debugging/chaos-scenarios.md | 7 ++- docs/claude/processing-lifecycle.md | 76 ++++++++++++++---------- 4 files changed, 59 insertions(+), 37 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 00ddc6fac..719918b85 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -44,8 +44,13 @@ def run_job(self, job_id: int) -> None: job.logger.error(f'Job #{job.pk} "{job.name}" failed: {e}') raise else: + from ami.jobs.models import JobDispatchMode + job.refresh_from_db() - job.logger.info(f"Finished job {job}") + if job.dispatch_mode == JobDispatchMode.ASYNC_API and not job.progress.is_complete(): + job.logger.info(f"run_job task exited for job {job}; async results still in-flight via NATS") + else: + job.logger.info(f"Finished job {job}") @celery_app.task( @@ -382,7 +387,8 @@ def _update_job_progress( # Diagnostic: when max() lifts the percentage to 1.0 from a partial value # this worker computed, surface it. A legitimate jump means another # worker concurrently completed the stage; an unexpected jump (e.g. the - # premature-cleanup pattern from antenna#????) is otherwise invisible. + # premature-cleanup pattern described in docs/claude/processing-lifecycle.md + # as "Bug B") is otherwise invisible. if existing_progress is not None and progress_percentage >= 1.0 and passed_progress < 1.0: job.logger.warning( f"Stage '{stage}' progress lifted to 100% by max() guard: " diff --git a/ami/ml/orchestration/nats_queue.py b/ami/ml/orchestration/nats_queue.py index 2270f7360..c5ec8705c 100644 --- a/ami/ml/orchestration/nats_queue.py +++ b/ami/ml/orchestration/nats_queue.py @@ -49,7 +49,8 @@ async def get_connection(nats_url: str) -> tuple[nats.NATS, JetStreamContext]: # A processing service that consistently fails (e.g. returns results referencing # an algorithm that the pipeline doesn't declare) will burn ADC + worker time on # every retry; one retry covers a transient blip and is the right tradeoff. -# Hoist to settings (NATS_MAX_DELIVER) when we need per-environment tuning. +# Override per environment via settings.NATS_MAX_DELIVER if that balance needs +# to change (e.g. a deployment with a flakier network may want a higher value). NATS_MAX_DELIVER = getattr(settings, "NATS_MAX_DELIVER", 2) ADVISORY_STREAM_NAME = "advisories" # Shared stream for max delivery advisories across all jobs diff --git a/docs/claude/debugging/chaos-scenarios.md b/docs/claude/debugging/chaos-scenarios.md index 73df7a0b6..d61775057 100644 --- a/docs/claude/debugging/chaos-scenarios.md +++ b/docs/claude/debugging/chaos-scenarios.md @@ -113,8 +113,11 @@ Verifies the terminal path: update_state returns None → ACK → `_fail_job`. ### Scenario D: ACK/SREM ordering (Bug A crash window) -Verifies Fix 1's reorder: a crash between save_results and results-SREM -leaves the message redeliverable, not stranded. +Verifies Fix 1's reorder: a crash anywhere between save_results and the +ACK leaves the NATS message redeliverable. This scenario drops the +crash point at the narrowest window — between the results-stage SREM +(`state_manager.update_state(stage="results")`) and `_ack_task_via_nats` +— because that is the window that stranded jobs on pre-Fix-1 code. 1. Patch `process_nats_pipeline_result` to `os._exit(1)` between `state_manager.update_state(stage="results")` and `_ack_task_via_nats(...)`. diff --git a/docs/claude/processing-lifecycle.md b/docs/claude/processing-lifecycle.md index ae57920e9..746ac9c44 100644 --- a/docs/claude/processing-lifecycle.md +++ b/docs/claude/processing-lifecycle.md @@ -37,26 +37,37 @@ when triaging a stuck job — the invariants table points at the class of bug. └─> processes image, POSTs to /api/v2/jobs/{id}/result └─> endpoint queues process_nats_pipeline_result(job_id, result_data, reply_subject) -[celeryworker] process_nats_pipeline_result(...) ami/jobs/tasks.py:69 +[celeryworker] process_nats_pipeline_result(...) ami/jobs/tasks.py:76 ├─> state_manager.update_state(stage="process", ids) [Redis: SREM pending:process] ├─> _update_job_progress("process", percentage, ...) [Job.progress.stages[process]] ├─> pipeline.save_results(results, job_id) [DB: Detections + Classifications] - ├─> _ack_task_via_nats(reply_subject) ◄─── CURRENT POSITION ami/jobs/tasks.py:179 ├─> state_manager.update_state(stage="results", ids) [Redis: SREM pending:results] - └─> _update_job_progress("results", percentage, ...) [Job.progress.stages[results]] - └─> if job.progress.is_complete(): - └─> cleanup_async_job_if_needed(job) ami/jobs/tasks.py:602 - └─> AsyncJobStateManager.cleanup() [Redis: DEL job:X:*] - └─> TaskQueueManager.cleanup_job_resources(...) [NATS: del consumer, del stream] + ├─> _update_job_progress("results", percentage, ...) [Job.progress.stages[results]] + │ └─> if job.progress.is_complete(): + │ └─> cleanup_async_job_if_needed(job) ami/jobs/tasks.py:657 + │ └─> AsyncJobStateManager.cleanup() [Redis: DEL job:X:*] + │ └─> TaskQueueManager.cleanup_job_resources(...) [NATS: del consumer, del stream] + └─> _ack_task_via_nats(reply_subject) ami/jobs/tasks.py:255 ``` -**The bug that Fix 1 addresses:** the ACK at `tasks.py:179` happens *before* -the results-stage SREM at `tasks.py:183`. A worker crash between those two -lines leaves NATS drained (message already acked, no redelivery) and Redis -`pending_images:results` permanently holding that image ID. The job can -never reach 100% on the results stage. No code path reconciles this — the -15-minute snapshot check logs state but does not transition the job. -Fix 1 moves the ACK to *after* the results-stage SREM + `_update_job_progress`. +**Why ACK runs last:** the results-stage SREM and `_update_job_progress` +must be durable in Redis and Postgres before NATS is told the message is +done. If a worker crashes anywhere above the ACK, NATS does not see an +ack within `ack_wait` (30s) and redelivers. The replay re-enters the +same code path idempotently: + +- `save_results` dedupes on `(detection, source_image)`. +- SREM is a no-op on already-removed IDs; the `newly_removed` return + (SREM's integer result) is 0, which gates counter accumulation so + detections/classifications/captures do not inflate. +- `_update_job_progress` clamps the percentage with `max()` so progress + never regresses, and preserves FAILURE once set. + +Earlier revisions of this code ACKed *before* the results-stage SREM. A +worker crash between those two lines left NATS drained (message already +acked, no redelivery) while Redis `pending_images:results` permanently +held that image ID — the job would never reach 100% on the results stage, +and no code path reconciled it. ## 2. State invariants @@ -66,11 +77,11 @@ checks for each — run them against a job_id that's suspected stuck. | Invariant | One-line check | |---|---| | If `Job.status==STARTED` and async_api, either NATS has work (num_pending+num_ack_pending>0) or Redis is empty + progress is 100% | `redis-cli -n 1 SCARD job:{id}:pending_images:results`; then see §5 for the NATS half | -| `Redis SCARD pending:results` ≤ `NATS delivered - ack_floor` at rest | if SCARD>0 but NATS shows everything acked, that's Bug A (Fix 1 territory) | +| `Redis SCARD pending:results` ≤ `NATS delivered - ack_floor` at rest | if SCARD>0 but NATS shows everything acked, that's the pre-PR-#1234 Bug A signature — should not happen against current code | | `job.progress.stages` contains `collect`, `process`, `results` before `run_job` exits | stages are initialized in `Job.setup()`, not lazily — see `ami/jobs/models.py:944-955` | | Cleanup only fires when `Job.status in final_states` OR `progress.is_complete() == True` | grep log for `Finalizing NATS consumer` — timestamp must be ≥ all `_update_job_progress` timestamps | | `is_complete()` returns True iff every stage has `progress>=1.0 AND status in final_states` | `ami/jobs/models.py:245-267` — works off an exhaustive stage list | -| `_update_job_progress` counter-accumulator on `results` stage runs *only when this batch's SREM newly removed IDs* | after Fix 1: inspect `newly_processed` gate. Before Fix 1: inflation on retry is possible (tracked in antenna#1232) | +| `_update_job_progress` counter-accumulator on `results` stage runs *only when this batch's SREM newly removed IDs* | gated on `progress_info.newly_removed` (SREM's integer return) at `ami/jobs/tasks.py:228`. Pre-Fix-1 revisions inflated on replay (antenna#1232). | If any invariant is violated, the failure mode is probably below. @@ -78,9 +89,9 @@ If any invariant is violated, the failure mode is probably below. | Symptom | Likely cause | Diagnostic | Fix direction | |---|---|---|---| -| Job STARTED forever; NATS drained; Redis `pending:results` > 0 | Worker crashed between ACK and results-stage SREM (Bug A) | `redis-cli -n 1 SCARD job:{id}:pending_images:results` > 0 AND NATS `num_pending+num_ack_pending == 0` | **Fix 1** (in-flight). Ship and backfill 15-min reconciler (Fix 2). | +| Job STARTED forever; NATS drained; Redis `pending:results` > 0 | Worker crashed between ACK and results-stage SREM (Bug A) | `redis-cli -n 1 SCARD job:{id}:pending_images:results` > 0 AND NATS `num_pending+num_ack_pending == 0` | Addressed by PR #1234 (ACK now runs after the results SREM + progress commit; crashes leave the message redeliverable). The 15-min stale-job reaper is the safety net if this class of bug resurfaces. | | Job FAILURE within 30-60s of dispatch; cleanup fired mid-processing | Premature `cleanup_async_job_resources` — `is_complete()` momentarily True (Bug B, not yet reproduced) | grep log `Finalizing NATS consumer` for job, compare timestamp to `Finished job` (run_job exit) and first `Updated job ... progress` line | Separate issue (see drafts in ami-devops). Not in scope for Fix 1. | -| Transient `run_job` exception flips job to FAILURE even though 100+ images were successfully queued | `task_failure` signal missing ASYNC_API guard (Bug C) | grep log for `task_failure` on `run_job` + Job row status=FAILURE + Redis still has pending IDs | Separate issue. Add `not job.progress.is_complete() and dispatch_mode==ASYNC_API` guard like `task_postrun` has at `tasks.py:647`. | +| Transient `run_job` exception flips job to FAILURE even though 100+ images were successfully queued | `task_failure` signal missing ASYNC_API guard (Bug C) | grep log for `task_failure` on `run_job` + Job row status=FAILURE + Redis still has pending IDs | Guarded in `update_job_failure` at `tasks.py:729`: defers FAILURE for ASYNC_API jobs when `progress.is_complete()` is False, mirroring the `task_postrun` SUCCESS guard. Stale-job reaper eventually revokes if the job truly stays stuck. | | `Job state keys not found in Redis` log line | Either genuine cleanup race, or transient Redis error being misreported | If paired with autoretry-backoff log lines, it's transient (normal); if one-shot, check if cleanup fired earlier for this job_id | Already fixed in #1219/#1231 (transient path now autoretries + logs distinctly) | | Batch processing crashes with OOM on the GPU worker | DataLoader leak (unrelated to antenna) | `dmesg -T \| grep -i oom` on ADC host | Mitigated with `AMI_NUM_WORKERS=1` in ADC config | | Broker "Connection reset by peer" hourly on celeryworker | TCP keepalive not applied in deployment | `cat /proc/sys/net/ipv4/tcp_keepalive_time` in the container | `apply_keepalive_fix.sh` in ami-devops | @@ -91,33 +102,33 @@ If any invariant is violated, the failure mode is probably below. Non-obvious places that touch lifecycle state. File:line shown; don't quote code. **Cleanup triggers — when and what state it sees:** -- `_update_job_progress` at `ami/jobs/tasks.py:375` — fires `cleanup_async_job_if_needed(job)` when `is_complete()` returns True. Runs after the DB transaction commits. State seen: final stage progress in Job.progress. Bug B would be here if `is_complete()` returns True on a transient view of the stages. -- `update_job_status` (task_postrun) at `ami/jobs/tasks.py:656` — fires cleanup only on `state == REVOKED`. SUCCESS is deferred via the `is_complete()` guard at line 647. -- `update_job_failure` (task_failure) at `ami/jobs/tasks.py:672` — always calls `cleanup_async_job_if_needed` for ANY run_job failure. Bug C: this destroys NATS/Redis state even if 100+ images were successfully queued and are mid-flight. -- `check_stale_jobs` at `ami/jobs/tasks.py:461` — fires cleanup on every stale job after `FAILED_CUTOFF_HOURS=72` whether it was marked REVOKED or updated-from-celery. -- `_fail_job` at `ami/jobs/tasks.py:248` — fires cleanup after marking job FAILURE. +- `_update_job_progress` at `ami/jobs/tasks.py:432` — fires `cleanup_async_job_if_needed(job)` when `is_complete()` returns True. Runs after the DB transaction commits. State seen: final stage progress in Job.progress. Bug B would be here if `is_complete()` returns True on a transient view of the stages. +- `update_job_status` (task_postrun) at `ami/jobs/tasks.py:683` — fires cleanup only on `state == REVOKED`. SUCCESS is deferred via the `is_complete()` guard at line 702. +- `update_job_failure` (task_failure) at `ami/jobs/tasks.py:715` — for ASYNC_API jobs, defers FAILURE + cleanup when `progress.is_complete()` is False (the Bug C guard at line 729). For other dispatch modes, and for ASYNC_API jobs that have actually finished, still marks FAILURE and fires cleanup. The stale-job reaper is the safety net if the deferred path never converges. +- `check_stale_jobs` at `ami/jobs/tasks.py:435` — fires cleanup on every stale job after `FAILED_CUTOFF_HOURS=72` whether it was marked REVOKED or updated-from-celery. +- `_fail_job` at `ami/jobs/tasks.py:270` — fires cleanup after marking job FAILURE. - `MLJob.run` at `ami/jobs/models.py:488` — fires cleanup on the zero-images path when is_complete is true immediately after `queue_images_to_nats`. **`_fail_job` call sites — when the terminal path is taken:** -- `process_nats_pipeline_result` at `tasks.py:125` — Redis `update_state` for process returned None (keys genuinely missing). -- `process_nats_pipeline_result` at `tasks.py:203` — same but for results stage. +- `process_nats_pipeline_result` at `tasks.py:132` — Redis `update_state` for process returned None (keys genuinely missing). +- `process_nats_pipeline_result` at `tasks.py:214` — same but for results stage. **`_update_job_progress` — stage params and accumulator:** -- `tasks.py:314` — `results` stage branch accumulates detections/classifications/captures by READING current Job.progress and ADDING. Not idempotent on replay. Fix 1 adds a `newly_processed` gate. -- `max()` guard at `tasks.py:342` — prevents progress regression when a slower worker lands after a faster one. +- `tasks.py:359` — `results` stage branch accumulates detections/classifications/captures by READING current Job.progress and ADDING. The caller gates this accumulation on `progress_info.newly_removed > 0` (see `tasks.py:228`) so replays pass zero counts and leave the totals idempotent. +- `max()` guard at `tasks.py:381` — prevents progress regression when a slower worker lands after a faster one. **`is_complete()` — single source of truth at `models.py:245-267`:** - Returns False if `self.stages` is empty (sanity check). - Returns True only when EVERY stage has `progress>=1.0 AND status in final_states()`. - `Job.setup()` at `models.py:923` initializes the full stage list before the run, so `is_complete()` at runtime sees an exhaustive list (not a partial one). This is what makes Bug B puzzling — the obvious "lazy stage" hypothesis doesn't fit. -**`state_manager.update_state` — Redis pipeline structure at `async_job_state.py:111`:** +**`state_manager.update_state` — Redis pipeline structure at `async_job_state.py:112`:** - Always returns a `JobStateProgress` dataclass or `None`. - `None` means the total-images key is gone (job expired, cleaned up concurrently, or never initialized). This is a terminal signal. - Raises `RedisError` on transient (connection reset, timeout). Callers must let autoretry_for handle this — swallowing it conflates transient with terminal (see #1219). -- After Fix 1: returns `newly_removed` (SREM's integer return) so `_update_job_progress("results")` can gate counter accumulation. +- Returns `newly_removed` on `JobStateProgress` (SREM's integer return at `async_job_state.py:163`) so `process_nats_pipeline_result` can gate counter accumulation on the results stage. -**`state_manager.cleanup` at `async_job_state.py:208` — idempotent:** +**`state_manager.cleanup` at `async_job_state.py:215` — idempotent:** - `DEL` on a non-existent key is a no-op. Safe to call multiple times for the same job. ## 5. Full trace for a single job @@ -169,5 +180,6 @@ docker compose logs celeryworker --tail=2000 2>&1 | grep -E "job[^a-z]+$JOB_ID|' --- -Last updated: 2026-04-15 (Fix 1 branch). When Fix 1 lands, update Section 1 -to reflect the new ACK position and delete the "CURRENT POSITION" annotation. +Last updated: 2026-04-15. Reflects PR #1234 (ACK runs after results-stage +SREM + progress commit; `task_failure` guard for in-flight ASYNC_API jobs; +counter accumulation gated on `newly_removed`). From 2f2fca260b61c89ee68b2c5db299cdc87ecaf4f1 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 15 Apr 2026 11:52:04 -0700 Subject: [PATCH 3/4] test(jobs): cover task_failure guard + collapse duplicated results-stage progress call MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two small follow-ups surfaced in PR review: - Bug C (the task_failure guard added in 476a7104) had no dedicated regression test. Add two: one asserts the ASYNC_API in-flight path keeps the job STARTED, preserves Redis state, and emits the expected deferral warning; a companion asserts SYNC_API still takes the terminal FAILURE + cleanup path. Tests invoke the handler body directly — Celery signal wiring isn't the subject. - Collapse the two _update_job_progress("results", ...) branches in process_nats_pipeline_result into one call driven by a counts tuple that picks between real counts and zeros based on newly_removed. No behaviour change; easier to read. The inline comment still explains the idempotency contract. Co-Authored-By: Claude --- ami/jobs/tasks.py | 37 +++++------- ami/jobs/tests/test_tasks.py | 112 +++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 22 deletions(-) diff --git a/ami/jobs/tasks.py b/ami/jobs/tasks.py index 719918b85..fb61a87e2 100644 --- a/ami/jobs/tasks.py +++ b/ami/jobs/tasks.py @@ -220,29 +220,22 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub # Counter-inflation guard: only add detection/classification/capture counts # when SREM actually removed ids (first processing of this result). On a # replay (NATS redelivered the message or the Celery task retried past - # the SREM), newly_removed==0 and we skip accumulation to keep the - # counters idempotent. The percentage/status path still runs because + # the SREM), newly_removed==0 and we pass zeros to keep the counters + # idempotent. The percentage/status path still runs because # _update_job_progress uses max() and preserves FAILURE regardless. - if progress_info.newly_removed > 0: - _update_job_progress( - job_id, - "results", - progress_info.percentage, - complete_state=complete_state, - detections=detections_count, - classifications=classifications_count, - captures=captures_count, - ) - else: - _update_job_progress( - job_id, - "results", - progress_info.percentage, - complete_state=complete_state, - detections=0, - classifications=0, - captures=0, - ) + is_first_processing = progress_info.newly_removed > 0 + counts_to_apply = ( + (detections_count, classifications_count, captures_count) if is_first_processing else (0, 0, 0) + ) + _update_job_progress( + job_id, + "results", + progress_info.percentage, + complete_state=complete_state, + detections=counts_to_apply[0], + classifications=counts_to_apply[1], + captures=counts_to_apply[2], + ) # Ack LAST — only after the results-stage SREM and progress commit are # durable. If anything above crashes, NATS will redeliver the message diff --git a/ami/jobs/tests/test_tasks.py b/ami/jobs/tests/test_tasks.py index 8e05c7243..dc5fa8d44 100644 --- a/ami/jobs/tests/test_tasks.py +++ b/ami/jobs/tests/test_tasks.py @@ -513,6 +513,118 @@ def test_process_nats_pipeline_result_error_job_not_found(self, mock_manager_cla mock_manager.acknowledge_task.assert_called_once_with(reply_subject) +class TestTaskFailureGuard(TransactionTestCase): + """ + Bug C regression tests for the task_failure signal guard in update_job_failure. + + Pre-PR-#1234 behavior: any exception raised in run_job (even after the images + were successfully queued to NATS and ADC workers were processing them) flowed + through Celery's task_failure signal and collapsed the job: status → FAILURE + and NATS/Redis cleanup destroyed state the result handler depended on. + + Post-PR-#1234 behavior: for ASYNC_API jobs that aren't progress.is_complete() + yet, the guard defers terminal state to the async result handler. Non-ASYNC + dispatch modes (and ASYNC_API jobs that have actually completed) still take + the terminal path. + + Tests here call `update_job_failure` as a plain function with the positional + arguments the task_failure signal would pass at runtime. The Celery signal + machinery itself is not the subject of the test — the signal handler body is. + """ + + def setUp(self): + cache.clear() + self.project = Project.objects.create(name="Bug C Guard Test Project") + self.pipeline = Pipeline.objects.create(name="Bug C Pipeline", slug="bug-c-pipeline") + self.pipeline.projects.add(self.project) + self.collection = SourceImageCollection.objects.create(name="Bug C Collection", project=self.project) + + def tearDown(self): + cache.clear() + + def _make_job(self, dispatch_mode: JobDispatchMode, task_id: str) -> Job: + job = Job.objects.create( + job_type_key=MLJob.key, + project=self.project, + name=f"{dispatch_mode} bug C test job", + pipeline=self.pipeline, + source_image_collection=self.collection, + dispatch_mode=dispatch_mode, + ) + job.task_id = task_id + # Initial status mirrors what run_job has already set via task_prerun by + # the time task_failure fires. + job.update_status(JobState.STARTED, save=True) + return job + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_task_failure_guard_defers_for_async_api_in_flight(self, mock_cleanup): + """ + Bug C: an exception in run_job post-queue on an ASYNC_API job must NOT + flip the job to FAILURE or fire cleanup — results are still arriving + via NATS, and tearing down stream/consumer/Redis state now would strand + the in-flight images. The guard at tasks.py:729 handles this. + """ + from ami.jobs.tasks import update_job_failure + + job = self._make_job(JobDispatchMode.ASYNC_API, task_id="bug-c-async-task") + # Initialize Redis state so progress.is_complete() is False (there are + # pending images). Also stand in for the ADC worker's view: it would + # still see state here and keep publishing results. + image_ids = ["100", "101", "102"] + AsyncJobStateManager(job.pk).initialize_job(image_ids) + + with self.assertLogs("ami.jobs", level="WARNING") as captured: + update_job_failure( + sender=None, + task_id=job.task_id, + exception=RuntimeError("simulated post-queue crash"), + ) + + job.refresh_from_db() + + # Job status unchanged: the guard returned before update_status(FAILURE). + self.assertEqual( + job.status, + JobState.STARTED, + "ASYNC_API in-flight job should remain STARTED when run_job raises", + ) + # Cleanup deferred: state is still needed by the async result handler. + mock_cleanup.assert_not_called() + # Redis state untouched — the NATS worker can keep reporting against it. + surviving_progress = AsyncJobStateManager(job.pk).get_progress("results") + self.assertIsNotNone(surviving_progress) + self.assertEqual(surviving_progress.remaining, len(image_ids)) + # Warning log surfaces the deferred failure. Ops alerting on this phrase + # is how the visibility loss described in the PR body is compensated. + self.assertTrue( + any("deferring FAILURE to async progress handler" in line for line in captured.output), + f"expected deferral warning, got: {captured.output}", + ) + + @patch("ami.jobs.tasks.cleanup_async_job_if_needed") + def test_task_failure_marks_sync_api_job_failure_and_cleans_up(self, mock_cleanup): + """ + Contract pair for the ASYNC_API guard: SYNC_API (and INTERNAL) jobs have + no in-flight external processing to preserve, so task_failure must still + mark FAILURE and invoke cleanup as before. + """ + from ami.jobs.tasks import update_job_failure + + job = self._make_job(JobDispatchMode.SYNC_API, task_id="bug-c-sync-task") + + update_job_failure( + sender=None, + task_id=job.task_id, + exception=RuntimeError("sync api crash"), + ) + + job.refresh_from_db() + + self.assertEqual(job.status, JobState.FAILURE) + mock_cleanup.assert_called_once() + + class TestResultEndpointWithError(APITestCase): """Integration test for the result API endpoint with error results.""" From 7f3b0e1922aa4bbc489b5b8d0437728721f12861 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 15 Apr 2026 12:06:44 -0700 Subject: [PATCH 4/4] test(cleanup): force complete progress before failure-path cleanup assertion The Bug C guard in update_job_failure defers FAILURE for ASYNC_API jobs that are still in-flight, so test_cleanup_on_job_failure's pre-existing setup (ASYNC_API + queued images + non-complete progress) now correctly takes the deferred path and cleanup does not run. Drive all stages to complete before firing task_failure so the guard falls through to the terminal cleanup path this test is meant to cover. Deferred-path behaviour is already covered by TestTaskFailureGuard in ami/jobs/tests/test_tasks.py. Co-Authored-By: Claude --- ami/ml/orchestration/tests/test_cleanup.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/ami/ml/orchestration/tests/test_cleanup.py b/ami/ml/orchestration/tests/test_cleanup.py index cb626348a..89a2ef7b3 100644 --- a/ami/ml/orchestration/tests/test_cleanup.py +++ b/ami/ml/orchestration/tests/test_cleanup.py @@ -165,13 +165,30 @@ def test_cleanup_on_job_completion(self): self._verify_resources_cleaned(job.pk) def test_cleanup_on_job_failure(self): - """Test that resources are cleaned up when job fails.""" + """Test that resources are cleaned up when job fails after progress is complete. + + The Bug C guard in update_job_failure defers FAILURE for ASYNC_API jobs that + are still in-flight (progress.is_complete() == False). To exercise the + terminal cleanup path via task_failure for an ASYNC_API job, we first drive + all stages to complete, then fire the failure signal. + """ job = self._create_job_with_queued_images() # Set task_id so the failure handler can find the job job.task_id = "test-task-failure-123" job.save() + # Drive progress to complete so the Bug C guard in update_job_failure falls + # through to cleanup. We mutate the persisted JobProgress directly because + # calling _update_job_progress with complete_state=SUCCESS would itself + # trigger cleanup (via cleanup_async_job_if_needed inside the progress + # update path), defeating the test's intent. + for stage in job.progress.stages: + stage.progress = 1.0 + stage.status = JobState.SUCCESS + job.save() + job.refresh_from_db() + # Simulate job failure by calling the failure signal handler update_job_failure( sender=None,