Skip to content

Improve the filter stage during job preparation, fix for large capture sets#1322

Merged
mihow merged 11 commits into
mainfrom
fix/filter-processed-images-bulk-1321
May 29, 2026
Merged

Improve the filter stage during job preparation, fix for large capture sets#1322
mihow merged 11 commits into
mainfrom
fix/filter-processed-images-bulk-1321

Conversation

@mihow

@mihow mihow commented May 27, 2026

Copy link
Copy Markdown
Collaborator

Operators running ML jobs on large Capture Sets (SourceImageCollection) were seeing the job sit in STARTED for ~20 minutes, no Collect-stage progress, no errors in the log, and then flip to REVOKED once the reaper noticed no forward progress. From their side the platform looked like it had silently dropped the job. The Celery worker was actually still running — stuck in the Collect → queue path doing per-image SQL — but no job.progress updates were being saved, so the reaper's 10-minute "no forward progress" heuristic fired.

Three independent O(N)-per-image sites in that path were each big enough on their own to push past the 10-minute reaper threshold on collections in the ~100k-image range. This PR fixes all three.

Collect → queue → NATS-stream-created on a ~100k-image job that previously hung for ~19 minutes and got reaped now completes in 42 seconds end-to-end (measured on the Serbia dev box on the same shape of collection, see numbers below). That's roughly a 20–25× drop in absolute wall time on the path that was getting killed; the per-image cost goes from ~7.7 ms (147k incident wall time ÷ image count) down to ~0.4 ms. Smaller jobs see the same shape but the absolute win is too small to notice.

Before (collecting images takes more than 15 minutes and gets revoked)
image

After (collecting images takes 30 seconds or so)
image

What this PR changes

  • Bulk-query filter_processed_images (ami/ml/models/pipeline.py). Was 3–5 ORM round-trips per image; now chunks via itertools.islice at batch_size=1000 and does at most 2 bulk queries per chunk. Semantics for all five processed/unprocessed branches preserved. 147k images → ~295 queries instead of ~500k. Measured: 9.4 s for 105k images, down from a reaper-killed run on 147k.
  • Prefetch deployment__data_source in collect_images (ami/ml/models/pipeline.py). The two queryset branches (collection-backed and deployment-backed) now .select_related("deployment__data_source") so the FK chain rides along on the initial fetch. Saves a query per image on the downstream image.url() call in queue_images_to_nats.
  • Cache image.url() once per iteration in queue_images_to_nats (ami/ml/orchestration/jobs.py). The pre-NATS prep loop called it twice — once in the truthy check, once for assignment — doubling the FK chain cost on every iteration. Now called once.
  • Fanout the NATS publish loop with asyncio.gather (ami/ml/orchestration/jobs.py, new constant NATS_PUBLISH_FANOUT_CHUNK_SIZE=200). Sequential await self.js.publish(...) stacked the per-message JetStream ack round-trip (~1.3 ms each, including TCP) linearly. Chunked gather lets the NATS client pipeline acks back to us. _ensure_stream / _ensure_consumer are also pre-warmed once before the loop — they were already cache-noop after first call, but pre-warming prevents 200 concurrent coroutines from all trying to create the stream on the first chunk. Measured: 139 s → 15 s on 105k images (~9× speedup on the publish loop alone).
  • Extract bbox_is_null() helper next to NULL_DETECTIONS_FILTER in ami/main/models.py so the SQL filter and the in-memory check share a single source of truth. Used inside the bulk filter.
  • Strip Detection's default Meta.ordering on the bulk select via .order_by(). The rows land in a dict — no point sorting them in Postgres.
  • Rename per-batch dicts in the bulk filter (images_with_pipeline_detection, real_pipeline_detections_per_image) so it's clear they only cover detections from this pipeline's algorithms.
  • Short-circuit the "no classifier" filter path (ami/ml/models/pipeline.py). When a pipeline has no classifier algorithms registered, the empty classifier set made set().issubset(observed) vacuously true, so images with existing detections were silently skipped — contradicting the "Will reprocess all images" warning. Now yields all images and returns. This is a pre-existing bug surfaced during review; fixed here because the PR rewrote that exact code path.

Tests

  • Empty input → 1 query.
  • Mixed batch across all five filter branches → correct subset yielded.
  • Query-count assertion: 10 images at batch_size=5 → 4 queries (1 pipeline + 2 detection batches + 1 classification batch). Locks in the O(batches) shape.
  • No-classifier short-circuit: a pipeline with no classifier algorithms registered yields every image (matches the "Will reprocess all images" warning) — guards the empty-set issubset bug fixed in review.
  • collect_images returns SourceImage rows with deployment + deployment.data_source joined — accessing them triggers 0 additional queries.
  • NATS fanout (4 tests in ami/ml/orchestration/tests/test_jobs.py, mocking TaskQueueManager): warm-up runs exactly once, publish_task await count equals image count across a chunk boundary, partial-failure path returns False without raising, and a stream-setup exception short-circuits and yields zero publishes.

Ordering note for the fanout change

JetStream still assigns ack.seq atomically on the server side as messages arrive, so the stream is still strictly ordered by arrival. What changes is the order within a 200-message gather chunk — two images dispatched in the same chunk can land in the stream in either order. Consumers fetch via consumer.fetch which yields in seq order, and the ADC worker processes each image-task independently with no cross-task ordering dependency, so this is safe. Globally the chunked order is still preserved: chunk N's messages all land in the stream before chunk N+1's.

Performance — what each fix bought

The original incident on a 147k-image collection ran for ~19 minutes before the reaper killed it. The job never reached the NATS publish loop — it stalled inside filter_processed_images doing per-image SQL — so we have direct evidence for one fix (the filter rewrite) and arithmetic / measurement for the rest. Per-fix attribution on a 105k-image workload:

Site / fix Before this PR After this PR Source of the "before" number
filter_processed_images ~13 min (105k images at the 7.7 ms/img incident rate) 9 s measured 147k incident wall time ÷ image count, scaled
image.url() × 2 + uncached deployment.data_source FK chain in queue prep loop ~9 ms × 2 × 105k ≈ 31 min if it had ever run absorbed into the 15 s publish loop Per-call cost measured on a real production SourceImage; never observed at scale because the filter stalled first
Sequential await self.js.publish(...) per image 139 s on 105k (~15 min extrapolated on 700k) 15 s measured Measured directly in run 1 of the A/B below
Total job-prep wall time on ~100k-image collection ~19 min → reaper kill 42.1 s Incident timeline (before); Serbia run 2 (after)

So the 20× headline isn't a single fix — the filter rewrite alone gets it under the reaper threshold, but the url() cache and the fanout each remove a stage that would have hit the threshold once the previous one stopped being the bottleneck. With only the filter fixed, the same job-prep on 700k images would have stalled inside the publish loop at ~15 minutes wall. With all three fixed, the publish loop scales to 700k in ~100 s.

The A/B below is the direct measurement that backs row 3 (sequential vs fanout publish) and shows what each stage contributes to the final 42 s.

Measured end-to-end on the Serbia dev box (2026-05-27)

Re-ran the incident path against the closest-shape collection on hand: project 20, collection 165 ("All images"), 105,091 SourceImage rows, pipeline quebec_vermont_moths_2023, dispatch_mode=async_api. Stack was this branch on top of Postgres + Redis + JetStream as on a fresh deploy. Pipeline 3's algorithms have already classified ~6k of the 105k, so the filter does meaningful work (all 105,091 images returned as "to process" after dedup).

Headline comparison

State Wall time on ~100k–150k images Outcome
Before this PR (production code, observed in #1321 on a 147k-image collection) ~19 minutes (1140 s) reaped at 19m18s, REVOKED
After this PR (measured on 105k-image collection) 42.1 seconds collect: SUCCESS, process queued, no reaper firing

That's the ~20× headline win that closes #1321. The per-image cost on the job-prep path drops from ~7.7 ms (incident wall time ÷ image count) to ~0.4 ms.

Where the time went — A/B of the publish-loop change inside this PR

The performance work is a stack of 5 commits (4c089c2459d05619). The first four fix the filter rewrite, the select_related, and the url() caching; the fifth adds the asyncio.gather fanout. (Later commits in the PR add the collect-progress emission, the review fixes, and tests — they don't change the perf path.) To check that the fanout commit specifically pulled its weight, I ran the same job twice — once with the first four commits applied (sequential publish loop still), once with all five (fanout publish loop):

Stage Run 1 — 4 commits, sequential publish (job 2544) Run 2 — 5 commits, fanout publish (job 2545)
Celery pickup + collect_images 13.8 s 14.2 s
filter_processed_images (105,091 images) 9.4 s 9.0 s
state_manager.initialize_job (Redis SADD) 4.1 s 3.6 s
queue_images_to_nats publish loop 139 s (~1.3 ms/img) ~15 s (~0.14 ms/img amortised)
Total run_job 166.9 s (2.78 min) 42.1 s

Both runs are on the same code stack as this PR for the filter changes — the 9 s filter is post-fix in both. The "before" 19-minute baseline above was never re-measured directly because the pre-fix code would have stalled the worker the same way it did in the incident. The 9 s filter and 7.7 ms → 0.4 ms per-image numbers are derived from the same 105k vs 147k extrapolation, which is the closest 1:1 comparison the dev box could give without rolling back to a known-broken state.

Throughput on the publish loop: 105,091 / 139 ≈ 755 msg/s sequential vs 105,091 / 15 ≈ 7,000 msg/s with fanout. The fanout rate was confirmed against NATS /varz samples taken during the run (in_msgs delta of ~45k over a 9 s window mid-publish, ~60k over the following 10 s window).

Extrapolating linearly at the measured fanout rate: a 700k-image job (the largest collection currently on Serbia) would spend ~100 s in the publish loop and ~60 s in the filter, totaling well under 5 minutes. Sequential publish on the same 700k collection would have taken ~15 minutes for the publish alone, so the fanout change is load-bearing for the largest collections, not just nice-to-have.

What the e2e proves: the job-prep path (Collect → filter → Redis init → NATS publish) on a real ~100k-image production-shape collection completes in 42 seconds and leaves the job in a healthy STARTED state with collect: SUCCESS and process queued. The 10-minute reaper threshold isn't approached at any stage.

Downstream consumption confirmed (2026-05-28 re-run)

The 2026-05-27 runs above queued all 105,091 messages but could not confirm the ADC worker actually consumed them: the dev box's ADC worker was in HTTP-poll mode with a 401 auth issue, so the process stage stayed at 0% after queueing. That criterion was satisfied only by inference (queue-success log + Redis sets initialised + 105,091 JetStream messages in flight).

After the auth was fixed, a fresh job (2547, same project 20 / collection 165 / quebec_vermont_moths_2023) on the latest branch ran the full loop. collect reached SUCCESS (1.0), then process advanced past 0 (STARTED, climbing) with results also moving, and Job.updated_at bumped steadily throughout (observed 00:03→00:06 and on), so the reaper never approached its threshold. This closes the one gap from the earlier runs: the full Collect → filter → publish → consume → results path is now observed working end-to-end, not inferred. Absolute process-stage throughput is bounded by ADC inference speed on 105k images, which is orthogonal to this PR.

Collect-stage progress emission (also in this PR)

filter_processed_images now accepts optional job and total kwargs and emits a throttled fractional collect progress update at most once every 5 s of wall time (COLLECT_PROGRESS_SAVE_INTERVAL_SECONDS), capped at 0.99 (COLLECT_PROGRESS_MAX_FRACTION) so the caller's terminal status=SUCCESS, progress=1 flip still owns the final value. collect_images wires both through. Legacy callers without job stay silent — the throttle block is fully gated on both new kwargs.

This makes the Collect stage tick Job.save(update_fields=["progress", "updated_at"]) regularly, which keeps Job.updated_at fresh against the reaper at ami/jobs/tasks.py:929-944 (cutoff Job.STALLED_JOBS_MAX_MINUTES = 10). updated_at is listed explicitly in update_fields because Django's auto_now only fires its pre_save hook for fields named in update_fields; omitting it (as an earlier revision of this PR did) would update the progress JSONB but leave updated_at stale, defeating the reaper-shielding entirely. The structural fix for the reaper false-positive lands here rather than in a follow-up because the wiring is cheap and there's no good reason to leave the gap open.

Two new tests in ami/ml/tests.py:

  • test_filter_processed_images_emits_throttled_collect_progress — a counter-based clock stub verifies the ≥5 s gate fires exactly the expected number of saves across a multi-chunk run, all with update_fields=["progress", "updated_at"], and that the final emitted fraction is capped at 0.99.
  • test_filter_processed_images_skips_progress_emission_without_job — omitting job keeps the legacy code path silent (zero Job.save() calls).

Earlier follow-up note about "celery_state=PENDING" was misleading paraphrase: during the Collect stage Django Job.status is STARTED (set by MLJob.run() at ami/jobs/models.py:462) and Celery's AsyncResult(task_id).state is also STARTED — PENDING only applies between enqueue and pickup. The actual reaper trigger is (status ∈ running_states) AND (updated_at < cutoff). STARTED is in running_states(). The 5 s throttle directly addresses the real trigger; nothing PENDING-specific remains to chase.

Not in this PR (follow-ups)

  • Queue-stage progress emission. queue_images_to_nats still has no incremental progress emission during the publish loop. The 42 s end-to-end wall time on 105k images measured above (and ~140 s extrapolated on 700k) is well under the 10-minute reaper cutoff, so this isn't load-bearing. Wiring it would require sync_to_async-bridged updates inside the asyncio.gather chunks and a careful pass on the Job.progress JSONB clobber race with the NATS result handler (_update_job_progress in ami/jobs/tasks.py).

Closes #1321.

Summary by CodeRabbit

  • Performance Improvements

    • Optimized image processing pipeline with batch database queries to enhance throughput during large-scale operations.
    • Improved task publishing efficiency with chunked batching to reduce latency during queue operations.
  • Improvements

    • Enhanced progress reporting during batch processing workflows.

Review Change Stack

mihow and others added 2 commits May 27, 2026 09:19
filter_processed_images() iterated images one at a time, issuing 3-5 ORM
round trips per image against Detection / Classification. On a ~147k-image
collection that produces ~500k single-row queries inside one Celery task,
which silences the AMQP heartbeat long enough that the reaper flips the
job from STARTED to REVOKED before any forward progress is reported.

Rewrite to chunk the input via itertools.islice and run at most two bulk
queries per chunk: one over Detection rows, one over Classification rows
for the real detections in that chunk. Semantics for all five branches
(no detections, null-only, unclassified real, partial classifier coverage,
fully classified) are preserved.

Adds tests covering empty input, mixed batches across all branches, an
assertion that query count scales with batch count rather than image
count, and ordering preservation across batch boundaries.

Fixes #1321.

Co-Authored-By: Claude <noreply@anthropic.com>
… dicts

Takeaway-review followups on the bulk filter_processed_images rewrite:

- Strip Detection's Meta.ordering on the bulk select via .order_by().
  The per-batch query was sorting all matching rows on (frame_num,
  timestamp) before returning, wasted work since results land in a dict.
- Extract bbox_is_null() helper in ami.main.models next to
  NULL_DETECTIONS_FILTER so the SQL filter and the in-memory check share
  a single source of truth.
- Rename images_with_any_detection -> images_with_pipeline_detection and
  real_detections_per_image -> real_pipeline_detections_per_image to make
  clear the dicts only cover detections from this pipeline's algorithms.

Co-Authored-By: Claude <noreply@anthropic.com>
@netlify

netlify Bot commented May 27, 2026

Copy link
Copy Markdown

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 373b16e
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/6a179c430e1b5f00083ef6d1

@netlify

netlify Bot commented May 27, 2026

Copy link
Copy Markdown

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 373b16e
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/6a179c43d3a40d00082e0218

@coderabbitai

coderabbitai Bot commented May 27, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

This PR addresses collection-preparation performance bottlenecks by replacing per-image ORM queries with batch-based DB queries in filter_processed_images(), adding NATS resource warm-up, and refactoring NATS image publishing to use chunked async operations instead of sequential loops. All changes are instrumented with comprehensive test coverage.

Changes

Collection Preparation and NATS Queueing Performance

Layer / File(s) Summary
Null-detection in-memory helper
ami/main/models.py
New bbox_is_null() function treats fetched bounding boxes as null when None or empty list, mirroring existing NULL_DETECTIONS_FILTER semantics for in-memory decision-making.
Batch-based image filtering implementation and tests
ami/ml/models/pipeline.py, ami/ml/tests.py
filter_processed_images() replaced with batch-query logic that preloads pipeline algorithms, bulk-fetches detections and classifications per batch, groups results in memory, and yields images missing pipeline classifications. Throttled progress updates via optional job and total parameters. collect_images() now uses select_related("deployment__data_source"). Test coverage: empty input, mixed batches, bounded query counts, throttled progress emission, and progress suppression without job.
NATS resource warm-up helper
ami/ml/orchestration/nats_queue.py
New public ensure_job_resources() method wraps upfront initialization of per-job JetStream stream and durable consumer by delegating to existing internal _ensure_stream() and _ensure_consumer() methods.
Chunked async NATS publishing and tests
ami/ml/orchestration/jobs.py, ami/ml/orchestration/tests/test_jobs.py
queue_images_to_nats() refactored to call ensure_job_resources() once upfront, replace sequential per-image publishing with chunked concurrent publishing via asyncio.gather(), and compute image_url once per image. Test coverage: stream/consumer warm-up invoked once, correct fanout boundary handling across chunk boundaries, partial publish failures counted without exception, and setup failures short-circuiting all publishing.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • RolnickLab/antenna#1093: Defines null-detection handling (NULL_DETECTIONS_FILTER, null-detection pipeline logic) that the main PR uses via new bbox_is_null() helper.
  • RolnickLab/antenna#1135: Modifies the same stream/consumer readiness helpers (_ensure_stream/_ensure_consumer) that the main PR's new ensure_job_resources() method wraps.
  • RolnickLab/antenna#987: Implements the NATS JetStream queueing architecture (queue_images_to_nats and TaskQueueManager) that the main PR refactors and extends.

Suggested labels

data-processing

Poem

🐰 Per-image loops begone—batches now reign!
Collections breathe easier, no more heartbeat pain.
NATS warms up once, chunks flow in parallel streams,
Job progress ascends—performance redeemed! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Improve the filter stage during job preparation, fix for large capture sets' accurately summarizes the primary changes: bulk query optimization of filter_processed_images and associated performance improvements for large collections.
Linked Issues check ✅ Passed All primary objectives from issue #1321 are addressed: filter_processed_images bulk-queries in chunks (verified per-image cost reduction), progress emission prevents reaper false-positives, deployment apply across both collect_images call sites, and comprehensive measurements quantify the 20× wall-time improvement.
Out of Scope Changes check ✅ Passed All code changes (bbox_is_null helper, filter rewrite, select_related prefetch, url caching, NATS fanout, progress throttling) are directly scoped to fixing the Collect-stage performance issue identified in #1321. No unrelated refactoring or feature additions detected.
Description check ✅ Passed The PR description comprehensively covers all required template sections with detailed explanations, performance metrics, and test coverage.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/filter-processed-images-bulk-1321

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@mihow mihow changed the title perf(ml): bulk-query Collect-stage filter to unstall large-collection jobs Improve the filter stage during job preparation, fix for large capture sets May 27, 2026
mihow and others added 3 commits May 27, 2026 13:57
collect_images() returned a SourceImage queryset without select_related on
the deployment/data_source FK chain. Downstream queue_images_to_nats()
calls image.url() per image, which traverses self.deployment.data_source —
two FK lookups per call. On a ~97k-image collection that's ~4.6ms × 97k =
~7 minutes of pure ORM round-trips before NATS publishes start, well
inside the 10-minute reaper window (see issue #1321 follow-up comment).

Add .select_related("deployment__data_source") to both queryset branches
in collect_images so the joins ride along on the initial SourceImage
fetch and image.url() in the queue loop stays a pure-Python operation.

Test asserts that accessing .deployment.data_source on the returned rows
triggers zero additional queries.

Co-Authored-By: Claude <noreply@anthropic.com>
The pre-NATS prep loop in queue_images_to_nats() called image.url() twice
per iteration — once in the truthy conditional, once for assignment.
image.url() goes through SourceImage.public_url() which touches deployment
and data_source; doubling it doubles the FK chain cost. On ~97k-image
collections the redundant call alone adds ~7 minutes (see issue #1321
follow-up).

Cache the call in a local once and drop the now-redundant hasattr/url
double-check. The companion select_related in collect_images keeps the
single remaining call cheap.

Co-Authored-By: Claude <noreply@anthropic.com>
Each publish_task awaits a JetStream ack round-trip (~1.3ms measured on
Serbia 2026-05-27 for the 105k-image incident-shape collection). Sequential
awaits stack linearly, so 105k images took 139s and 700k would push past
the 10-min reaper threshold on its own.

Switch the inner loop to chunked asyncio.gather batches of 200 so the
client can pipeline ack roundtrips. _ensure_stream / _ensure_consumer are
hoisted out of the loop and pre-warmed once on entry — they were already
cache-skipped after first call (nats_queue.py:310,358) but each call still
serialised inside its publish coroutine.

JetStream still assigns ack.seq atomically server-side, so messages remain
ordered in the stream by arrival; only the order *within* a fanout chunk
is non-deterministic from the caller's POV. ADC consumers fetch in seq
order and treat each image-task independently — no per-image ordering
dependency.

Adds 5 unit tests in ami/ml/orchestration/tests/test_jobs.py mocking
TaskQueueManager: warm-up count, chunk-boundary publish count, partial
publish failure path, stream-setup failure short-circuit, and a sentinel
test locking the chunk constant.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow marked this pull request as ready for review May 27, 2026 23:22
Copilot AI review requested due to automatic review settings May 27, 2026 23:22

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a large-collection performance bottleneck in ML job preparation (Collect → filter → queue) that previously caused jobs to appear stalled and get reaped, by reducing per-image ORM work and improving NATS publish throughput.

Changes:

  • Rewrite filter_processed_images to bulk-query detections/classifications in batches instead of per-image ORM calls.
  • Ensure collect_images returns SourceImage rows with deployment__data_source joined to avoid N+1s during URL generation.
  • Speed up NATS queueing by caching image.url() per iteration and publishing JetStream messages concurrently in bounded chunks.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
ami/ml/models/pipeline.py Batch/bulk implementation of filter_processed_images and select_related("deployment__data_source") in collect_images.
ami/ml/orchestration/jobs.py Cache image.url() once per image and fan out JetStream publishes via chunked asyncio.gather.
ami/main/models.py Add bbox_is_null() helper to share null-bbox semantics between ORM and in-memory logic.
ami/ml/tests.py Add tests for bounded query counts, batch order preservation, and collect_images prefetch behavior.
ami/ml/orchestration/tests/test_jobs.py Add unit tests for fanout chunking/warm-up behavior and failure handling in queue_images_to_nats.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ami/ml/models/pipeline.py
Comment thread ami/ml/models/pipeline.py Outdated
Comment thread ami/ml/orchestration/jobs.py Outdated
Comment thread ami/ml/models/pipeline.py
mihow and others added 4 commits May 27, 2026 16:34
…mages

filter_processed_images now accepts optional `job` and `total` kwargs and
emits a fractional `collect` progress update at most once per
COLLECT_PROGRESS_SAVE_INTERVAL_SECONDS (5s wall time), capped at
COLLECT_PROGRESS_MAX_FRACTION (0.99). collect_images wires both through.

Reaper at ami/jobs/tasks.py:929 triggers on (status in running_states) AND
(updated_at < cutoff, default 10min). Before this commit, the collect stage
held a worker for several seconds to ~minutes on large collections without
ever saving the Job row, so updated_at went stale and the reaper revoked
otherwise-healthy jobs. The throttled save now keeps updated_at fresh.

Cap at 0.99 so the caller's terminal status=SUCCESS, progress=1 transition
still owns the final flip. Legacy callers without `job` see zero
job.save() calls — the throttle block is fully gated on both new kwargs.

Two new tests in ami/ml/tests.py:
- mocked time.monotonic verifies the >=5s gate fires exactly the expected
  number of times across a multi-chunk run.
- omitting `job` keeps the legacy code path silent.

Co-Authored-By: Claude <noreply@anthropic.com>
Three small fixes flagged on PR #1322:
- Typo in the debug log message ("do yet have" → "do not yet have").
- Reword the `collect_images` source-selection comment so it accurately
  describes when the `deployment__data_source` join is applied — the
  `source_images` branch hands the caller's iterable through unchanged.
- Fail fast on non-positive `batch_size` instead of silently filtering
  out every image when an `islice` of 0/negative returns an empty batch.

Co-Authored-By: Claude <noreply@anthropic.com>
…Manager

Per Copilot review on PR #1322: calling _ensure_stream / _ensure_consumer
directly from the orchestrator coupled queue_images_to_nats to private
TaskQueueManager internals. Adds a thin public wrapper that calls both,
keeps the per-instance "already-warmed" caches behind the manager's API,
and gives future refactors a single seam to evolve.

Switches the publish-loop warm-up in queue_images_to_nats to the new
public method and updates the fanout unit tests accordingly.

Co-Authored-By: Claude <noreply@anthropic.com>
Follow-up to the takeaway review on PR #1322:

- Drop `test_filter_processed_images_preserves_input_order_across_batches`
  — `test_filter_processed_images_mixed_batch` already asserts the output
  order with `[unprocessed, unclassified]`. There is no shuffle/reorder
  path in the function for a cross-batch ordering test to exercise.
- Drop `test_default_chunk_size_is_explicit` — locking the constant value
  catches no behavior regression; the explanatory comment at
  `ami/ml/orchestration/jobs.py:14-19` is the real documentation.
- Replace the `iter([...])` + `next(...)` clock stub in
  `test_filter_processed_images_emits_throttled_collect_progress` with a
  counter-based stub. The previous pattern would crash with
  `StopIteration` if a future change in `filter_processed_images` added
  another `time.monotonic()` call; the counter form fails with a clear
  cadence-mismatch assertion instead.

No coverage lost.

Co-Authored-By: Claude <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
ami/ml/orchestration/jobs.py (1)

103-107: 💤 Low value

Consider whether the hasattr check is necessary.

The hasattr(image, "url") guard is defensive, but SourceImage should always have a url() method since images is typed as list[SourceImage]. If there's a specific case where the method might be missing (e.g., a proxy object or a mock), the guard is justified; otherwise, simplifying to image_url = image.url() would be cleaner.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@ami/ml/orchestration/jobs.py` around lines 103 - 107, The hasattr guard
around image.url() is unnecessary because images is typed as list[SourceImage];
replace the conditional assignment with a direct call (image_url = image.url())
in the loop where image_url is defined, and remove the comment justifying the
guard; if there are tests or mocks that caused the guard to be added, instead
update those tests/mocks to implement url() rather than keeping the runtime
check (see the collect_images() path and the SourceImage type for where to
validate).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@ami/ml/models/pipeline.py`:
- Around line 112-114: The bug is that pipeline_classifier_ids being empty makes
pipeline_classifier_ids.issubset(classifier_ids_per_detection) vacuously true,
skipping images incorrectly; fix by short-circuiting: explicitly check if
pipeline_classifier_ids is empty (the set built from pipeline_algorithms and
detection_type_keys) and when empty immediately take the "reprocess all images"
branch (log via task_logger and skip the subset logic), and apply the same
explicit-empty check to the other similar blocks referencing
pipeline_classifier_ids and classifier_ids_per_detection around the areas you
noted (lines ~147-153 and ~170-182) so the subset test is only evaluated when
pipeline_classifier_ids is non-empty.
- Around line 203-208: The heartbeat save for the Job progress currently calls
job.save(update_fields=["progress"]) which prevents Django from auto-updating
Job.updated_at; modify the save to include "updated_at" (e.g.,
job.save(update_fields=["progress", "updated_at"])) or simply call job.save() so
updated_at is bumped; locate the block around the collect heartbeat (references:
job, processed_count, total, COLLECT_PROGRESS_SAVE_INTERVAL_SECONDS,
last_progress_save_monotonic) and update the save call accordingly.

In `@ami/ml/tests.py`:
- Line 569: The test currently uses a hardcoded secret literal secret_key="y"
which triggers Ruff S106; to fix, replace the inline literal with a generated
variable (e.g. import secrets and assign a runtime value like secret_key =
secrets.token_urlsafe(...) or derive a variable above the test) and pass that
variable into the fixture/call instead of the literal, or alternatively add an
explicit noqa comment (S106) next to the parameter with a short justification;
update the usage of the symbol secret_key in the test to reference the new
variable (and add the necessary import) so Ruff no longer flags the hardcoded
secret.

---

Nitpick comments:
In `@ami/ml/orchestration/jobs.py`:
- Around line 103-107: The hasattr guard around image.url() is unnecessary
because images is typed as list[SourceImage]; replace the conditional assignment
with a direct call (image_url = image.url()) in the loop where image_url is
defined, and remove the comment justifying the guard; if there are tests or
mocks that caused the guard to be added, instead update those tests/mocks to
implement url() rather than keeping the runtime check (see the collect_images()
path and the SourceImage type for where to validate).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ca87a556-6035-4e51-9173-5714f641052c

📥 Commits

Reviewing files that changed from the base of the PR and between f585ddc and 4133e66.

📒 Files selected for processing (6)
  • ami/main/models.py
  • ami/ml/models/pipeline.py
  • ami/ml/orchestration/jobs.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_jobs.py
  • ami/ml/tests.py

Comment thread ami/ml/models/pipeline.py
Comment thread ami/ml/models/pipeline.py Outdated
Comment thread ami/ml/tests.py Outdated
mihow and others added 2 commits May 27, 2026 18:24
Three correctness fixes and one nit cleanup from the CodeRabbit review
on PR #1322:

1. **`Job.save` heartbeat now bumps `updated_at`.** The throttled progress
   save in `filter_processed_images` was calling
   `job.save(update_fields=["progress"])`. Django's `auto_now=True`
   pre_save hook only fires for fields listed in `update_fields`, so
   `Job.updated_at` was never refreshed. The reaper at
   `ami/jobs/tasks.py:929-944` keys off `Job.updated_at < cutoff`, so
   the heartbeat did not actually defeat the reaper as intended.
   Including `updated_at` in `update_fields` restores the auto_now
   behavior. Test assertion updated to enforce the contract.

2. **Short-circuit the "no classifier" path.** When a pipeline has zero
   classifier algorithms registered, `pipeline_classifier_ids` is the
   empty set and `set().issubset(anything)` is vacuously True — so the
   subset check below would skip every image with existing detections,
   directly contradicting the warning "Will reprocess all images."
   Now `yield from images; return` after the warning so the warning's
   stated behavior actually holds.

3. **Drop the `hasattr(image, "url")` guard in `queue_images_to_nats`.**
   `images` is typed `list[SourceImage]` and `SourceImage` always defines
   `url()`. The guard masked nothing real and confused readers.

4. **`# noqa: S106` on the fixture `secret_key="y"` values** in two test
   files so future runs under Ruff's bandit ruleset stay clean. These
   are clearly fixture values, never real credentials.

Co-Authored-By: Claude <noreply@anthropic.com>
Add a behavioural test that pins the fix from 51a7fff: a pipeline with
no classifier algorithms registered must yield every input image,
matching the "Will reprocess all images" warning. Without the
short-circuit added in 51a7fff, the empty `pipeline_classifier_ids`
set makes the `set().issubset(observed)` check vacuously True and
images with detections get silently skipped.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow merged commit 1dfb41f into main May 29, 2026
7 checks passed
@mihow mihow deleted the fix/filter-processed-images-bulk-1321 branch May 29, 2026 04:19
@mihow mihow added the PSv2 Async & distributed ML backend (PSv2): job state, NATS dispatch, result handling. Umbrella #515. label Jun 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

PSv2 Async & distributed ML backend (PSv2): job state, NATS dispatch, result handling. Umbrella #515.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Preparing jobs takes too long preparing large collections

2 participants