Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions ami/ml/models/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,15 +952,6 @@ def save_results(
"Algorithms and category maps must be registered before processing, using /info endpoint."
)

# Ensure all images have detections
# if not, add a NULL detection (empty bbox) to the results
null_detections = create_null_detections_for_undetected_images(
results=results,
detection_algorithm=detection_algorithm,
logger=job_logger,
)
results.detections = results.detections + null_detections

detections = create_detections(
detections=results.detections,
algorithms_known=algorithms_known,
Expand All @@ -981,6 +972,22 @@ def save_results(
logger=job_logger,
)

# Mark images with no real detections as processed by creating null-bbox sentinels.
# Issue #1310: must run AFTER the real-detection / classification / occurrence steps
# so a failure earlier in the pipeline leaves the image unmarked (and therefore
# re-processed by filter_processed_images on the next run). Null DetectionResponses
# are kept out of the real-detection list so they bypass occurrence creation entirely.
null_detection_responses = create_null_detections_for_undetected_images(
results=results,
detection_algorithm=detection_algorithm,
logger=job_logger,
)
create_detections(
detections=null_detection_responses,
algorithms_known=algorithms_known,
logger=job_logger,
)

Comment on lines 955 to +990
# Update precalculated counts on source images and events
source_images = list(source_images)
logger.info(f"Updating calculated fields for {len(source_images)} source images")
Expand Down
73 changes: 73 additions & 0 deletions ami/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Deployment,
Detection,
Event,
Occurrence,
Project,
SourceImage,
SourceImageCollection,
Expand Down Expand Up @@ -1024,6 +1025,78 @@ def test_null_detection_deduplication_same_pipeline(self):
null_detections = image.detections.filter(bbox__isnull=True)
self.assertEqual(null_detections.count(), 1, "Same pipeline should not create duplicate null detections")

def test_null_detection_does_not_create_phantom_occurrence(self):
"""
Issue #1310: a null detection (empty-bbox sentinel marking "image processed,
nothing found") must NOT spawn an Occurrence. Occurrences with no
determination and no real detections leak to the API as ghost rows.
"""
image = self.test_images[0]
results = self.fake_pipeline_results([image], self.pipeline)
results.detections = [] # pipeline found nothing

save_results(results)

null_dets = image.detections.filter(bbox__isnull=True)
self.assertEqual(null_dets.count(), 1, "Null marker should still be created")
self.assertIsNone(
null_dets.first().occurrence,
"Null detection must NOT be associated with an Occurrence",
)
# No phantom Occurrence in DB tied to this image at all
phantom_occs = Occurrence.objects.filter(detections__source_image=image, determination__isnull=True)
self.assertEqual(
phantom_occs.count(),
0,
"No Occurrence with NULL determination should exist for an image that had no detections",
)

def test_captures_not_marked_processed_after_failure(self):
"""
Issue #1310: null markers should only flag images as processed AFTER all
downstream save steps (classifications, occurrences) succeed. If any
downstream step raises, the image must remain unmarked so the next run
re-processes it.

Reproduces the field bug where 400 images ended up with null markers but
no real detections — created when null-creation ran ahead of a later step
that failed.
"""
from unittest.mock import patch

from ami.ml.models.pipeline import filter_processed_images

# Mix: image_with_real has a detection in the response, image_without_real does not.
# The without-real image is the one that would get a null marker.
image_with_real, image_without_real = self.test_images
results = self.fake_pipeline_results(self.test_images, self.pipeline)
# Trim detections to only the first image so the second qualifies for null-marker creation
results.detections = [d for d in results.detections if str(d.source_image_id) == str(image_with_real.pk)]

# Inject failure in a step that runs AFTER detection bulk_create
with patch(
"ami.ml.models.pipeline.create_classifications",
side_effect=RuntimeError("simulated classification failure"),
):
with self.assertRaises(RuntimeError):
save_results(results)

# The image with no real detection must NOT have a null marker —
# the run failed, so it should be re-tried.
null_dets = image_without_real.detections.filter(bbox__isnull=True)
self.assertEqual(
null_dets.count(),
0,
"Image without real detections must not be marked processed when downstream step fails",
)
# filter_processed_images should still yield it for the next run
retry_yield = list(filter_processed_images([image_without_real], self.pipeline))
self.assertEqual(
retry_yield,
[image_without_real],
"Image with failed run must be re-yielded for processing",
)


class TestAlgorithmCategoryMaps(TestCase):
def setUp(self):
Expand Down
88 changes: 88 additions & 0 deletions docs/claude/reference/live-dev-testing-atomic-rollback.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Safe live testing on shared dev DB — `atomic()` + rollback sentinel

Pattern for exercising mutating code paths (`save_results`, `create_detections`, etc.) against the real dev-server database without leaving state behind. Useful when:

- The unit test suite can't reproduce a specific bug because it uses fakes/factories
- You need to exercise a code path with real upstream data (image rows, algorithm rows, etc.) that you don't want to fabricate
- You don't have permission (or shouldn't) to delete pre-existing rows on the shared DB

## The pattern

```python
from django.db import transaction

class _Rollback(Exception):
pass

try:
with transaction.atomic():
# call the mutating code under test
save_results(results, job_id=None)

# observe state INSIDE the atomic block (writes are visible here)
for s in images:
s.refresh_from_db()
print(s.detections.count(), s.detections.filter(NULL_FILTER).count())

# …assertions…

raise _Rollback("intentional rollback to keep DB clean")
except _Rollback:
pass

# after the block: DB restored to baseline
```

## Why a custom exception, not just `transaction.set_rollback(True)`?

- `set_rollback(True)` is silent — easy to forget and accidentally let writes commit. Custom exception is loud.
- A bare `raise` would propagate. The `_Rollback` class lets us catch *only* the intentional rollback and let real errors bubble.
- Inside the `try/except`, exceptions from the code under test (e.g. a `RuntimeError` from `create_classifications`) still propagate normally — separate the failure-injection signal from the cleanup signal.

## How to run a multi-line script in the Django container

`docker compose exec -T django python -c "..."` does NOT work for Django code — Django's app registry isn't loaded for raw `python -c`. You get `AppRegistryNotReady: Apps aren't loaded yet.`

Two options that DO work:

```bash
# Option A: stdin into manage.py shell (good for one-shot scripts)
docker compose exec -T django python manage.py shell < /path/to/script.py

# Option B: copy file in, then run (good if you want logs / args)
docker cp /tmp/script.py container-name:/tmp/script.py
docker compose exec -T django python manage.py shell -c "exec(open('/tmp/script.py').read())"
```

For testing on a remote dev box, the canonical flow is:

```bash
scp /tmp/script.py antenna-dev-serbia:/tmp/script.py
ssh antenna-dev-serbia 'docker cp /tmp/script.py antenna-django-1:/tmp/script.py && \
cd ~/antenna && docker compose exec -T django python manage.py shell < /tmp/script.py'
```

## Example: validating PR #1312 `save_results` fix

Full scripts in `docs/claude/sessions/2026-05-19-pr-1312-premptive-processed-marker.md`. The pattern lets you:

- Build a synthetic `PipelineResultsResponse` (mix of images with/without detections)
- Call `save_results` and observe `Detection.bbox`, `Detection.occurrence_id`, `Occurrence.determination_id`
- Inject a downstream failure via `unittest.mock.patch("ami.ml.models.pipeline.create_classifications", side_effect=RuntimeError(...))`
- Roll back everything, leaving the shared DB unchanged

## Caveats

- **Side effects outside the transaction will NOT roll back.** Examples on this codebase:
- `transaction.on_commit(lambda: task.delay())` Celery dispatches
- `update_calculated_fields` background tasks
- S3 / MinIO writes
- Cache invalidations via signals that don't honor transactions

For `save_results`, `transaction.on_commit` callbacks won't fire on rollback, which is usually what you want (no spurious downstream work). But if the code under test writes to external systems, those writes persist.

- **`refresh_from_db()` is required** to see in-block changes on objects you fetched before the mutation — Django's in-memory objects don't auto-sync.

- **Don't use this pattern for irreversible operations** (deleted-then-recreated unique constraints, sequence advances). Postgres sequences advance even on rollback.

- **Auto-mode classifier may still block destructive verbs** (`DELETE`, `.delete()`) even when wrapped in `atomic()`, because the classifier reads the statement intent, not the transaction surroundings. If blocked, either ask the user explicitly or restructure to avoid the destructive verb.
74 changes: 74 additions & 0 deletions docs/claude/reference/reprocess-flags.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Reprocess flags — how `reprocess_all_images` and `reprocess_existing_detections` interact

Two flags govern whether an ML job re-runs over already-processed images. They look similar but control different stages and interact in non-obvious ways. Source of truth: `ami/ml/models/pipeline.py:140-253`.

## The two flags

| Flag | Where it lives | What it controls |
|---|---|---|
| `reprocess_all_images` | Per-job kwarg; also `Project.feature_flags.reprocess_all_images` | Whether `filter_processed_images` runs at all on the collected images |
| `reprocess_existing_detections` | `Project.feature_flags.reprocess_existing_detections` + optional `pipeline_config["reprocess_existing_detections"]` | Whether existing `Detection` rows are sent to the pipeline as `DetectionRequest`s (so the pipeline re-classifies them) instead of being recreated |

## The collect step (`collect_images`, lines 134-176)

- `reprocess_all_images=False` (default): `filter_processed_images(images, pipeline)` filters out images that have already been processed by the pipeline's algorithms. Only images that need work are sent.
- `reprocess_all_images=True`: filter is bypassed; every image in the collection goes downstream regardless of prior state.

## The process step (`process_images`, lines 179-253)

`process_images` re-runs the filter (line 212) unless `reprocess_all_images=True`. So the filter actually runs twice in the default path — once on collect, once before dispatch.

Then a separate decision (lines 232-237):

```python
reprocess_existing_detections = reprocess_all_images
if project and project.feature_flags.reprocess_existing_detections:
if pipeline_config.get("reprocess_existing_detections", True):
reprocess_existing_detections = True
```

When `reprocess_existing_detections` is true, the existing `Detection` rows for each image are bundled into the request as `DetectionRequest`s. The pipeline then re-classifies those same bboxes rather than redoing detection.

## What you observe when only `reprocess_all_images=True`

- All collected images are sent to the pipeline (filter bypassed).
- BUT `reprocess_existing_detections` becomes `True` too (line 232), so existing detection rows are sent with them.
- The pipeline returns those same detections (often with updated classifications).
- `create_detections` in `save_results` matches incoming detections against existing rows by `(source_image, bbox, detection_algorithm)` and **updates in place** — no new `Detection` rows.
- Net effect: detection count and PKs unchanged after the job. **Do not interpret this as "save_results didn't run."** It did; it just didn't need to create new rows.
Comment on lines +34 to +38

This is what was observed on serbia in PR #1312 e2e: project 9 had `reprocess_all_images=True`, job 162 reprocessed all 10 images, and the DB delta was zero because every detection was already there.

## What you observe when only `reprocess_existing_detections=True`

- `filter_processed_images` still skips fully-processed images.
- For images that DO need work (e.g. a new classifier was added), their existing real detections are sent to the pipeline as `DetectionRequest`s instead of being re-detected from scratch.

## What you observe when BOTH are off (the default)

- `filter_processed_images` skips fully-processed images.
- Surviving images go to the pipeline with NO existing detections in the request — the pipeline runs the detector from scratch and `create_detections` creates new rows.

## Implications for testing

- **For regression / behavior tests on `save_results`:** the unit tests already use synthetic `PipelineResultsResponse` objects, so flag state doesn't matter there.
- **For e2e tests on a dev box:** if you want to see `Detection` row count grow, you need `reprocess_all_images=False` AND images that are genuinely unprocessed by the target pipeline. Easiest way: pick a pipeline whose detector algorithm has not run on the images yet (different `detection_algorithm_id`), OR use a fresh `SourceImageCollection` whose images have never been processed.
- **For "I ran the job and nothing changed in the DB" debugging:** check the project's `feature_flags.reprocess_all_images` first. If it's on and `reprocess_existing_detections` is off, your job ran fine but performed updates in place. Use celeryworker logs (`Saved pipeline results to database with N detections`) for evidence of activity.

## `test_ml_job_e2e` reported stats are pipeline-response counts, not DB delta

`manage.py test_ml_job_e2e` prints lines like:

```
📊 Final Results:
Process: 100.0% (SUCCESS)
Processed: 10
Results: 100.0% (SUCCESS)
Captures: 10
Detections: 68
Classifications: 121
```

"Detections: 68" is the number of `DetectionResponse` objects returned by the pipeline across all batches in this job. It is NOT the number of new rows in the `Detection` table. If `reprocess_existing_detections` is true and the pipeline returned the same bboxes, you can see "Detections: 68" with zero net DB changes.

For DB-delta verification, snapshot detection / occurrence PKs before and after via Django shell, not via job stats.
Loading