diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index c259e4aea..ac3d2cf3f 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -952,15 +952,6 @@ def save_results( "Algorithms and category maps must be registered before processing, using /info endpoint." ) - # Ensure all images have detections - # if not, add a NULL detection (empty bbox) to the results - null_detections = create_null_detections_for_undetected_images( - results=results, - detection_algorithm=detection_algorithm, - logger=job_logger, - ) - results.detections = results.detections + null_detections - detections = create_detections( detections=results.detections, algorithms_known=algorithms_known, @@ -981,6 +972,22 @@ def save_results( logger=job_logger, ) + # Mark images with no real detections as processed by creating null-bbox sentinels. + # Issue #1310: must run AFTER the real-detection / classification / occurrence steps + # so a failure earlier in the pipeline leaves the image unmarked (and therefore + # re-processed by filter_processed_images on the next run). Null DetectionResponses + # are kept out of the real-detection list so they bypass occurrence creation entirely. + null_detection_responses = create_null_detections_for_undetected_images( + results=results, + detection_algorithm=detection_algorithm, + logger=job_logger, + ) + create_detections( + detections=null_detection_responses, + algorithms_known=algorithms_known, + logger=job_logger, + ) + # Update precalculated counts on source images and events source_images = list(source_images) logger.info(f"Updating calculated fields for {len(source_images)} source images") diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 0eaa0a237..171cfcd0c 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -13,6 +13,7 @@ Deployment, Detection, Event, + Occurrence, Project, SourceImage, SourceImageCollection, @@ -1024,6 +1025,78 @@ def test_null_detection_deduplication_same_pipeline(self): null_detections = image.detections.filter(bbox__isnull=True) self.assertEqual(null_detections.count(), 1, "Same pipeline should not create duplicate null detections") + def test_null_detection_does_not_create_phantom_occurrence(self): + """ + Issue #1310: a null detection (empty-bbox sentinel marking "image processed, + nothing found") must NOT spawn an Occurrence. Occurrences with no + determination and no real detections leak to the API as ghost rows. + """ + image = self.test_images[0] + results = self.fake_pipeline_results([image], self.pipeline) + results.detections = [] # pipeline found nothing + + save_results(results) + + null_dets = image.detections.filter(bbox__isnull=True) + self.assertEqual(null_dets.count(), 1, "Null marker should still be created") + self.assertIsNone( + null_dets.first().occurrence, + "Null detection must NOT be associated with an Occurrence", + ) + # No phantom Occurrence in DB tied to this image at all + phantom_occs = Occurrence.objects.filter(detections__source_image=image, determination__isnull=True) + self.assertEqual( + phantom_occs.count(), + 0, + "No Occurrence with NULL determination should exist for an image that had no detections", + ) + + def test_captures_not_marked_processed_after_failure(self): + """ + Issue #1310: null markers should only flag images as processed AFTER all + downstream save steps (classifications, occurrences) succeed. If any + downstream step raises, the image must remain unmarked so the next run + re-processes it. + + Reproduces the field bug where 400 images ended up with null markers but + no real detections — created when null-creation ran ahead of a later step + that failed. + """ + from unittest.mock import patch + + from ami.ml.models.pipeline import filter_processed_images + + # Mix: image_with_real has a detection in the response, image_without_real does not. + # The without-real image is the one that would get a null marker. + image_with_real, image_without_real = self.test_images + results = self.fake_pipeline_results(self.test_images, self.pipeline) + # Trim detections to only the first image so the second qualifies for null-marker creation + results.detections = [d for d in results.detections if str(d.source_image_id) == str(image_with_real.pk)] + + # Inject failure in a step that runs AFTER detection bulk_create + with patch( + "ami.ml.models.pipeline.create_classifications", + side_effect=RuntimeError("simulated classification failure"), + ): + with self.assertRaises(RuntimeError): + save_results(results) + + # The image with no real detection must NOT have a null marker — + # the run failed, so it should be re-tried. + null_dets = image_without_real.detections.filter(bbox__isnull=True) + self.assertEqual( + null_dets.count(), + 0, + "Image without real detections must not be marked processed when downstream step fails", + ) + # filter_processed_images should still yield it for the next run + retry_yield = list(filter_processed_images([image_without_real], self.pipeline)) + self.assertEqual( + retry_yield, + [image_without_real], + "Image with failed run must be re-yielded for processing", + ) + class TestAlgorithmCategoryMaps(TestCase): def setUp(self): diff --git a/docs/claude/reference/live-dev-testing-atomic-rollback.md b/docs/claude/reference/live-dev-testing-atomic-rollback.md new file mode 100644 index 000000000..40cc06905 --- /dev/null +++ b/docs/claude/reference/live-dev-testing-atomic-rollback.md @@ -0,0 +1,88 @@ +# Safe live testing on shared dev DB — `atomic()` + rollback sentinel + +Pattern for exercising mutating code paths (`save_results`, `create_detections`, etc.) against the real dev-server database without leaving state behind. Useful when: + +- The unit test suite can't reproduce a specific bug because it uses fakes/factories +- You need to exercise a code path with real upstream data (image rows, algorithm rows, etc.) that you don't want to fabricate +- You don't have permission (or shouldn't) to delete pre-existing rows on the shared DB + +## The pattern + +```python +from django.db import transaction + +class _Rollback(Exception): + pass + +try: + with transaction.atomic(): + # call the mutating code under test + save_results(results, job_id=None) + + # observe state INSIDE the atomic block (writes are visible here) + for s in images: + s.refresh_from_db() + print(s.detections.count(), s.detections.filter(NULL_FILTER).count()) + + # …assertions… + + raise _Rollback("intentional rollback to keep DB clean") +except _Rollback: + pass + +# after the block: DB restored to baseline +``` + +## Why a custom exception, not just `transaction.set_rollback(True)`? + +- `set_rollback(True)` is silent — easy to forget and accidentally let writes commit. Custom exception is loud. +- A bare `raise` would propagate. The `_Rollback` class lets us catch *only* the intentional rollback and let real errors bubble. +- Inside the `try/except`, exceptions from the code under test (e.g. a `RuntimeError` from `create_classifications`) still propagate normally — separate the failure-injection signal from the cleanup signal. + +## How to run a multi-line script in the Django container + +`docker compose exec -T django python -c "..."` does NOT work for Django code — Django's app registry isn't loaded for raw `python -c`. You get `AppRegistryNotReady: Apps aren't loaded yet.` + +Two options that DO work: + +```bash +# Option A: stdin into manage.py shell (good for one-shot scripts) +docker compose exec -T django python manage.py shell < /path/to/script.py + +# Option B: copy file in, then run (good if you want logs / args) +docker cp /tmp/script.py container-name:/tmp/script.py +docker compose exec -T django python manage.py shell -c "exec(open('/tmp/script.py').read())" +``` + +For testing on a remote dev box, the canonical flow is: + +```bash +scp /tmp/script.py antenna-dev-serbia:/tmp/script.py +ssh antenna-dev-serbia 'docker cp /tmp/script.py antenna-django-1:/tmp/script.py && \ + cd ~/antenna && docker compose exec -T django python manage.py shell < /tmp/script.py' +``` + +## Example: validating PR #1312 `save_results` fix + +Full scripts in `docs/claude/sessions/2026-05-19-pr-1312-premptive-processed-marker.md`. The pattern lets you: + +- Build a synthetic `PipelineResultsResponse` (mix of images with/without detections) +- Call `save_results` and observe `Detection.bbox`, `Detection.occurrence_id`, `Occurrence.determination_id` +- Inject a downstream failure via `unittest.mock.patch("ami.ml.models.pipeline.create_classifications", side_effect=RuntimeError(...))` +- Roll back everything, leaving the shared DB unchanged + +## Caveats + +- **Side effects outside the transaction will NOT roll back.** Examples on this codebase: + - `transaction.on_commit(lambda: task.delay())` Celery dispatches + - `update_calculated_fields` background tasks + - S3 / MinIO writes + - Cache invalidations via signals that don't honor transactions + + For `save_results`, `transaction.on_commit` callbacks won't fire on rollback, which is usually what you want (no spurious downstream work). But if the code under test writes to external systems, those writes persist. + +- **`refresh_from_db()` is required** to see in-block changes on objects you fetched before the mutation — Django's in-memory objects don't auto-sync. + +- **Don't use this pattern for irreversible operations** (deleted-then-recreated unique constraints, sequence advances). Postgres sequences advance even on rollback. + +- **Auto-mode classifier may still block destructive verbs** (`DELETE`, `.delete()`) even when wrapped in `atomic()`, because the classifier reads the statement intent, not the transaction surroundings. If blocked, either ask the user explicitly or restructure to avoid the destructive verb. diff --git a/docs/claude/reference/reprocess-flags.md b/docs/claude/reference/reprocess-flags.md new file mode 100644 index 000000000..1462128f6 --- /dev/null +++ b/docs/claude/reference/reprocess-flags.md @@ -0,0 +1,74 @@ +# Reprocess flags — how `reprocess_all_images` and `reprocess_existing_detections` interact + +Two flags govern whether an ML job re-runs over already-processed images. They look similar but control different stages and interact in non-obvious ways. Source of truth: `ami/ml/models/pipeline.py:140-253`. + +## The two flags + +| Flag | Where it lives | What it controls | +|---|---|---| +| `reprocess_all_images` | Per-job kwarg; also `Project.feature_flags.reprocess_all_images` | Whether `filter_processed_images` runs at all on the collected images | +| `reprocess_existing_detections` | `Project.feature_flags.reprocess_existing_detections` + optional `pipeline_config["reprocess_existing_detections"]` | Whether existing `Detection` rows are sent to the pipeline as `DetectionRequest`s (so the pipeline re-classifies them) instead of being recreated | + +## The collect step (`collect_images`, lines 134-176) + +- `reprocess_all_images=False` (default): `filter_processed_images(images, pipeline)` filters out images that have already been processed by the pipeline's algorithms. Only images that need work are sent. +- `reprocess_all_images=True`: filter is bypassed; every image in the collection goes downstream regardless of prior state. + +## The process step (`process_images`, lines 179-253) + +`process_images` re-runs the filter (line 212) unless `reprocess_all_images=True`. So the filter actually runs twice in the default path — once on collect, once before dispatch. + +Then a separate decision (lines 232-237): + +```python +reprocess_existing_detections = reprocess_all_images +if project and project.feature_flags.reprocess_existing_detections: + if pipeline_config.get("reprocess_existing_detections", True): + reprocess_existing_detections = True +``` + +When `reprocess_existing_detections` is true, the existing `Detection` rows for each image are bundled into the request as `DetectionRequest`s. The pipeline then re-classifies those same bboxes rather than redoing detection. + +## What you observe when only `reprocess_all_images=True` + +- All collected images are sent to the pipeline (filter bypassed). +- BUT `reprocess_existing_detections` becomes `True` too (line 232), so existing detection rows are sent with them. +- The pipeline returns those same detections (often with updated classifications). +- `create_detections` in `save_results` matches incoming detections against existing rows by `(source_image, bbox, detection_algorithm)` and **updates in place** — no new `Detection` rows. +- Net effect: detection count and PKs unchanged after the job. **Do not interpret this as "save_results didn't run."** It did; it just didn't need to create new rows. + +This is what was observed on serbia in PR #1312 e2e: project 9 had `reprocess_all_images=True`, job 162 reprocessed all 10 images, and the DB delta was zero because every detection was already there. + +## What you observe when only `reprocess_existing_detections=True` + +- `filter_processed_images` still skips fully-processed images. +- For images that DO need work (e.g. a new classifier was added), their existing real detections are sent to the pipeline as `DetectionRequest`s instead of being re-detected from scratch. + +## What you observe when BOTH are off (the default) + +- `filter_processed_images` skips fully-processed images. +- Surviving images go to the pipeline with NO existing detections in the request — the pipeline runs the detector from scratch and `create_detections` creates new rows. + +## Implications for testing + +- **For regression / behavior tests on `save_results`:** the unit tests already use synthetic `PipelineResultsResponse` objects, so flag state doesn't matter there. +- **For e2e tests on a dev box:** if you want to see `Detection` row count grow, you need `reprocess_all_images=False` AND images that are genuinely unprocessed by the target pipeline. Easiest way: pick a pipeline whose detector algorithm has not run on the images yet (different `detection_algorithm_id`), OR use a fresh `SourceImageCollection` whose images have never been processed. +- **For "I ran the job and nothing changed in the DB" debugging:** check the project's `feature_flags.reprocess_all_images` first. If it's on and `reprocess_existing_detections` is off, your job ran fine but performed updates in place. Use celeryworker logs (`Saved pipeline results to database with N detections`) for evidence of activity. + +## `test_ml_job_e2e` reported stats are pipeline-response counts, not DB delta + +`manage.py test_ml_job_e2e` prints lines like: + +``` +📊 Final Results: + Process: 100.0% (SUCCESS) + Processed: 10 + Results: 100.0% (SUCCESS) + Captures: 10 + Detections: 68 + Classifications: 121 +``` + +"Detections: 68" is the number of `DetectionResponse` objects returned by the pipeline across all batches in this job. It is NOT the number of new rows in the `Detection` table. If `reprocess_existing_detections` is true and the pipeline returned the same bboxes, you can see "Detections: 68" with zero net DB changes. + +For DB-delta verification, snapshot detection / occurrence PKs before and after via Django shell, not via job stats.