Skip to content

fix: revoke stale jobs to "revoked" status instead of "pending"#1169

Merged
mihow merged 7 commits into
mainfrom
fix/revoke-stale-jobs
Mar 3, 2026
Merged

fix: revoke stale jobs to "revoked" status instead of "pending"#1169
mihow merged 7 commits into
mainfrom
fix/revoke-stale-jobs

Conversation

@mihow

@mihow mihow commented Mar 3, 2026

Copy link
Copy Markdown
Collaborator

Summary

Manual cleanup command for stale jobs, needed until the periodic task in #1025 is deployed. Revokes jobs stuck in running states when Celery has no record of them, instead of cycling them back to PENDING.

Changes

  • update_stale_jobs management command now revokes jobs by default instead of setting PENDING
  • Core logic extracted into check_stale_jobs() in ami/jobs/tasks.py so the periodic task in Add periodic status check for incomplete jobs  #1025 can reuse it
  • Only terminal Celery states (READY_STATES: SUCCESS, FAILURE, REVOKED) are trusted for reconciliation; non-terminal states fall through to revocation
  • For async_api jobs, SUCCESS and FAILURE are only accepted when job.progress.is_complete() — NATS workers may still be delivering results after the Celery task finishes
  • Calls cleanup_async_job_if_needed to drain NATS streams and Redis state in both branches (terminal reconciliation and revocation)
  • select_for_update prevents concurrent runs from processing the same job
  • AsyncResult lookup is wrapped in try-except so a single broker failure doesn't abort the batch
  • --dry-run flag to preview changes before applying
  • 6 tests covering dry-run, revocation, terminal state reconciliation, async_api progress guard, broker error handling, and filtering

Usage

# Preview what would be revoked
python manage.py update_stale_jobs --dry-run

# Revoke stale jobs (default: 72 hours)
python manage.py update_stale_jobs

# Custom staleness threshold
python manage.py update_stale_jobs --hours 24

Context

AsyncResult.state returns PENDING for tasks Celery has no record of. Previously, stale jobs with unknown Celery state would be set to PENDING and re-found on the next run. Now: only trust Celery when it reports a terminal state (SUCCESS, FAILURE, REVOKED). Otherwise revoke the job, set finished_at, and clean up async resources.

Tested against local dev DB with 30 synthetic stale jobs across all running states (STARTED, PENDING, CREATED, RETRY, CANCELING, UNKNOWN), dispatch modes (internal, sync_api, async_api), and ages. Recent jobs and final-state jobs correctly skipped. NATS/Redis cleanup handled gracefully when streams were already deleted. Idempotent on re-run.

Related: #1025, #1162, #1168

update_stale_jobs previously checked Celery for task state, but
AsyncResult returns PENDING for tasks it has no record of. This
caused stale jobs to cycle through PENDING endlessly, and async_api
jobs kept serving tasks to workers via the /tasks/ endpoint.

Now: only trust Celery when it reports a known state (SUCCESS,
FAILURE, etc). Otherwise revoke the job and clean up NATS/Redis
resources. Also adds --dry-run flag.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@netlify

netlify Bot commented Mar 3, 2026

Copy link
Copy Markdown

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 2c2d3f5
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69a69b04c4871e00086a5c1a

@netlify

netlify Bot commented Mar 3, 2026

Copy link
Copy Markdown

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 2c2d3f5
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69a69b046bd9430008343444

@coderabbitai

coderabbitai Bot commented Mar 3, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

Replaces in-command stale-job logic with a new check_stale_jobs(hours, dry_run) function in ami.jobs.tasks, refactors the management command to call it (with dry-run), and adds tests for revocation, status updates from Celery, and skipping non-stale jobs. check_stale_jobs appears defined twice in the file.

Changes

Cohort / File(s) Summary
Management Command
ami/jobs/management/commands/update_stale_jobs.py
Command now delegates to check_stale_jobs(hours, dry_run), adds --dry-run, removes direct DB/Celery mutation logic and JobState import, and prints structured results returned by the task function.
Task Logic (duplicate)
ami/jobs/tasks.py
Adds check_stale_jobs(hours: int | None = None, dry_run: bool = False) -> list[dict] that finds stale RUNNING jobs, consults Celery AsyncResult, either updates job state or revokes + calls cleanup_async_job_if_needed(). Note: the same function is defined twice (duplicate top-level definitions).
Tests
ami/jobs/tests/test_update_stale_jobs.py
New CheckStaleJobsTest test class with cases for dry-run, revocation, updating from Celery state, revoking SUCCESS with incomplete progress, and skipping recent/final-state jobs; uses mocks for AsyncResult and cleanup.

Sequence Diagram

sequenceDiagram
    participant Cmd as Management<br/>Command
    participant Tasks as check_stale_jobs()<br/>(ami.jobs.tasks)
    participant DB as Job DB
    participant Celery as Celery<br/>AsyncResult
    participant Cleanup as cleanup_async_<br/>job_if_needed()

    Cmd->>Tasks: check_stale_jobs(hours, dry_run)
    Tasks->>DB: Query RUNNING jobs older than cutoff
    DB-->>Tasks: List of stale jobs

    loop per stale job
        Tasks->>Celery: AsyncResult(task_id) -> get state
        Celery-->>Tasks: task state / info
        alt Valid terminal state (and complete)
            Tasks->>DB: Update job state, set finished_at
        else No valid state or incomplete on SUCCESS
            Tasks->>DB: Mark job REVOKED, set finished_at
            Tasks->>Cleanup: cleanup_async_job_if_needed(job)
            Cleanup-->>Tasks: cleanup done
        end
        Tasks-->>Tasks: Append result dict (action, job_id, prev_state, new_state...)
    end

    Tasks-->>Cmd: return list[dict] results
    Cmd->>Cmd: Print results (with dry-run prefix if enabled)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐇 In burrows of code I sniff and peep,

I find the tasks that stalled in sleep,
Some I nudge, some I gently sever —
Dry-run notes first, then clean forever,
Hopping on to the next job heap.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.82% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description check ✅ Passed The PR description is comprehensive and follows the template structure with all major sections present including Summary, Changes, Usage, and Context.
Title check ✅ Passed The title accurately describes the main change: refactoring stale job handling to revoke them to 'revoked' status instead of 'pending', which is the core behavioral fix across the updated management command, new task function, and tests.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/revoke-stale-jobs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Move core stale-job logic from management command into
check_stale_jobs() in tasks.py. The management command is now a
thin wrapper. Add tests for the extracted function.

This prepares for #1025 which will call check_stale_jobs() from a
Celery Beat periodic task.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow marked this pull request as ready for review March 3, 2026 01:50
Copilot AI review requested due to automatic review settings March 3, 2026 01:50

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a reusable stale-job reconciliation helper and updates the existing manual management command to revoke “unknown to Celery” jobs (instead of cycling them back to PENDING), with a --dry-run mode and accompanying tests.

Changes:

  • Extracts stale-job logic into check_stale_jobs() for reuse by future periodic tasks.
  • Updates update_stale_jobs command to use the new helper, revoke by default, and support --dry-run.
  • Introduces new unit tests for the stale-job behavior.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.

File Description
ami/jobs/tasks.py Adds check_stale_jobs() that inspects Celery state and revokes jobs with no known Celery record, optionally cleaning async resources.
ami/jobs/management/commands/update_stale_jobs.py Switches the management command to call check_stale_jobs(), adds --dry-run, and adjusts output messaging.
ami/jobs/tests_update_stale_jobs.py Adds tests intended to validate dry-run behavior, revocation behavior, and skip logic.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread ami/jobs/tests/test_update_stale_jobs.py
Comment thread ami/jobs/tests/test_update_stale_jobs.py
Comment thread ami/jobs/tasks.py Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
ami/jobs/management/commands/update_stale_jobs.py (1)

23-23: Silence ARG002 by renaming the unused positional var.

Line 23 can avoid the lint warning by using _args.

Proposed fix
-    def handle(self, *args, **options):
+    def handle(self, *_args, **options):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/management/commands/update_stale_jobs.py` at line 23, The handle
function currently accepts an unused positional argument as *args which triggers
ARG002; rename it to *_args in the function signature (def handle(self, *_args,
**options):) so the linter knows the positional var is intentionally
unused—update the handle method definition and any internal references (there
should be none) accordingly.
ami/jobs/tests_update_stale_jobs.py (1)

30-66: Add a test for the "updated" branch (known Celery state).

Current tests only exercise revoke/skip paths. Please add coverage where AsyncResult(...).state is a trusted terminal state and assert status update + no cleanup call.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests_update_stale_jobs.py` around lines 30 - 66, Add a new test in
ami/jobs/tests_update_stale_jobs.py that covers the "updated" branch by creating
a stale STARTED job (use _create_job with hours_ago past the stale threshold),
patch ami.jobs.tasks.AsyncResult to return a trusted terminal state (e.g.,
return_value.state = JobState.SUCCESS.value), call check_stale_jobs(), and
assert the results show the "updated" action, the job.status equals
JobState.SUCCESS.value after refresh_from_db(), and that
cleanup_async_job_if_needed (patch the existing mock) was not called; reference
check_stale_jobs, AsyncResult, cleanup_async_job_if_needed, and JobState when
locating where to add the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 359-364: Capture job.status into a local variable before mutating
it so the returned previous_status is correct: store previous_status =
job.status immediately before calling job.update_status(JobState.REVOKED,
save=False) (and before setting finished_at/save/cleanup_async_job_if_needed),
then use that previous_status in the results.append({"job_id": job.pk, "action":
"revoked", "previous_status": previous_status}) call.
- Around line 339-358: The current reconciliation uses known_celery_states =
frozenset(states.ALL_STATES) - {states.PENDING} which includes non-terminal
states; change the check to only trust terminal Celery states (states.SUCCESS,
states.FAILURE, states.REVOKED) when deciding to update a Job (use those symbols
instead of ALL_STATES), and when you plan to revoke/update a job record capture
the job's original status into a local variable (e.g., previous_status =
job.status) before calling job.update_status(...) / job.save() so the
results.append entry records the original status rather than the mutated one;
keep references to AsyncResult(job.task_id).state, job.update_status, job.save
and results.append when applying these changes.

---

Nitpick comments:
In `@ami/jobs/management/commands/update_stale_jobs.py`:
- Line 23: The handle function currently accepts an unused positional argument
as *args which triggers ARG002; rename it to *_args in the function signature
(def handle(self, *_args, **options):) so the linter knows the positional var is
intentionally unused—update the handle method definition and any internal
references (there should be none) accordingly.

In `@ami/jobs/tests_update_stale_jobs.py`:
- Around line 30-66: Add a new test in ami/jobs/tests_update_stale_jobs.py that
covers the "updated" branch by creating a stale STARTED job (use _create_job
with hours_ago past the stale threshold), patch ami.jobs.tasks.AsyncResult to
return a trusted terminal state (e.g., return_value.state =
JobState.SUCCESS.value), call check_stale_jobs(), and assert the results show
the "updated" action, the job.status equals JobState.SUCCESS.value after
refresh_from_db(), and that cleanup_async_job_if_needed (patch the existing
mock) was not called; reference check_stale_jobs, AsyncResult,
cleanup_async_job_if_needed, and JobState when locating where to add the test.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 81a7864 and 281e931.

📒 Files selected for processing (3)
  • ami/jobs/management/commands/update_stale_jobs.py
  • ami/jobs/tasks.py
  • ami/jobs/tests_update_stale_jobs.py

Comment thread ami/jobs/tasks.py Outdated
Comment thread ami/jobs/tasks.py Outdated
@mihow

mihow commented Mar 3, 2026

Copy link
Copy Markdown
Collaborator Author

Code review

Found 4 issues:

  1. finished_at not set in the "updated from Celery state" branch — when Celery reports a terminal state (SUCCESS, FAILURE), the code calls update_status() and save() but does not set job.finished_at, leaving it None. Every other terminal-state transition in the codebase sets finished_at explicitly (e.g. _fail_job() at line 191, the revoke branch in this same function at line 361). This breaks Job.duration() for recovered jobs.

antenna/ami/jobs/tasks.py

Lines 353 to 358 in 281e931

if celery_state in known_celery_states:
if not dry_run:
job.update_status(celery_state, save=False)
job.save()
results.append({"job_id": job.pk, "action": "updated", "state": celery_state})
else:

  1. SUCCESS from Celery applied unconditionally, bypassing the job.progress.is_complete() guard — the update_job_status signal handler (line 415–418) explicitly defers SUCCESS for async_api jobs until job.progress.is_complete(), because NATS workers continue delivering results after the Celery task finishes. But check_stale_jobs() calls job.update_status(celery_state, save=False) + job.save() directly, which triggers a post_save signal rather than a task_postrun signal, bypassing that guard entirely and potentially marking an async_api job as SUCCESS while its NATS consumer is still active.

antenna/ami/jobs/tasks.py

Lines 353 to 358 in 281e931

if celery_state in known_celery_states:
if not dry_run:
job.update_status(celery_state, save=False)
job.save()
results.append({"job_id": job.pk, "action": "updated", "state": celery_state})
else:

  1. previous_status always logs REVOKED instead of the original status — job.update_status(JobState.REVOKED, save=False) mutates job.status in-place (see models.py:1014), so the results.append({"previous_status": job.status}) on line 364 reads the already-mutated value. The fix is to capture previous_status = job.status before the if not dry_run: block.

antenna/ami/jobs/tasks.py

Lines 358 to 365 in 281e931

else:
if not dry_run:
job.update_status(JobState.REVOKED, save=False)
job.finished_at = datetime.datetime.now()
job.save()
cleanup_async_job_if_needed(job)
results.append({"job_id": job.pk, "action": "revoked", "previous_status": job.status})

  1. known_celery_states includes non-terminal states — frozenset(states.ALL_STATES) - {states.PENDING} retains STARTED, RETRY, and RECEIVED. If Celery still reports one of these for a 72-hour-old job (e.g. result backend TTL > cutoff hours), lines 355–357 will update the job to that intermediate state without revoking it or cleaning up async resources, leaving it permanently stuck. states.READY_STATES (= {SUCCESS, FAILURE, REVOKED}) is the appropriate constant to use here.

antenna/ami/jobs/tasks.py

Lines 337 to 358 in 281e931

hours = Job.FAILED_CUTOFF_HOURS
known_celery_states = frozenset(states.ALL_STATES) - {states.PENDING}
cutoff = datetime.datetime.now() - datetime.timedelta(hours=hours)
stale_jobs = Job.objects.filter(
status__in=JobState.running_states(),
updated_at__lt=cutoff,
)
results = []
for job in stale_jobs:
celery_state = None
if job.task_id:
celery_state = AsyncResult(job.task_id).state
if celery_state in known_celery_states:
if not dry_run:
job.update_status(celery_state, save=False)
job.save()
results.append({"job_id": job.pk, "action": "updated", "state": celery_state})
else:

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

mihow and others added 2 commits March 2, 2026 23:51
Consolidate ami/jobs/tests.py, test_tasks.py, and tests_update_stale_jobs.py
into an ami/jobs/tests/ package with consistent test_ prefixes. All files are
now discovered by Django's default test runner pattern.

Co-Authored-By: Claude <noreply@anthropic.com>
Four issues fixed:

- Use states.READY_STATES instead of ALL_STATES - {PENDING} so non-terminal
  Celery states (STARTED, RETRY, RECEIVED) don't leave jobs stuck
- Guard SUCCESS: only accept it when job.progress.is_complete(), matching the
  existing check in update_job_status(); otherwise revoke the job
- Set finished_at in both the "updated" and "revoked" branches
- Capture previous_status before calling update_status() so the result dict
  reflects the original state rather than the post-mutation REVOKED value

Tests added for the updated-from-Celery-state and SUCCESS-with-incomplete-
progress paths.

Co-Authored-By: Claude <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
ami/jobs/tests/test_update_stale_jobs.py (1)

59-96: Add regression coverage for terminal FAILURE with incomplete progress and terminal-branch cleanup.

This suite covers SUCCESS+incomplete revocation well, but it doesn’t lock behavior for incomplete FAILURE or cleanup when Celery reports a terminal state and the code takes the “updated” branch.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests/test_update_stale_jobs.py` around lines 59 - 96, Add a new
test (or extend existing ones) in ami/jobs/tests/test_update_stale_jobs.py to
cover the case where Celery reports a terminal FAILURE but
job.progress.is_complete() is False: mock celery.result.AsyncResult to return
states.FAILURE, create a job with status=JobState.STARTED and incomplete
progress, call check_stale_jobs(), assert the returned action is "updated" (or
whatever the terminal-branch should be), assert job.status becomes
JobState.FAILURE.value and finished_at is set, and crucially assert
cleanup_async_job_if_needed (the patched function) is called once for that job
when the code takes the terminal/updated branch; reference AsyncResult,
check_stale_jobs, cleanup_async_job_if_needed, JobState and
job.progress.is_complete() to locate where to add the assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 360-371: The terminal-state branch (when is_terminal is true)
updates and saves the job but never frees async resources; modify the terminal
path so that after calling job.update_status(...) and job.save() (and only when
not dry_run) you also call cleanup_async_job_if_needed(job) to reconcile
NATS/Redis resources for recovered final-state async jobs; keep the call
placement consistent with the non-terminal branch (i.e., after setting
finished_at and saving).
- Around line 355-363: The code currently only prevents marking SUCCESS as
terminal when progress is incomplete; update the terminal-guard logic in the
block using is_terminal/celery_state so FAILURE is treated the same: if
celery_state is either states.SUCCESS or states.FAILURE and
job.progress.is_complete() is False, set is_terminal = False (i.e., change the
existing check that tests only states.SUCCESS to include states.FAILURE or add
an explicit check for FAILURE), and only call job.update_status(...) and set
job.finished_at when is_terminal is true (as currently done); reference symbols:
is_terminal, celery_state, states.READY_STATES, states.FAILURE,
job.progress.is_complete(), and job.update_status.

---

Nitpick comments:
In `@ami/jobs/tests/test_update_stale_jobs.py`:
- Around line 59-96: Add a new test (or extend existing ones) in
ami/jobs/tests/test_update_stale_jobs.py to cover the case where Celery reports
a terminal FAILURE but job.progress.is_complete() is False: mock
celery.result.AsyncResult to return states.FAILURE, create a job with
status=JobState.STARTED and incomplete progress, call check_stale_jobs(), assert
the returned action is "updated" (or whatever the terminal-branch should be),
assert job.status becomes JobState.FAILURE.value and finished_at is set, and
crucially assert cleanup_async_job_if_needed (the patched function) is called
once for that job when the code takes the terminal/updated branch; reference
AsyncResult, check_stale_jobs, cleanup_async_job_if_needed, JobState and
job.progress.is_complete() to locate where to add the assertions.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 281e931 and 0bf2867.

📒 Files selected for processing (5)
  • ami/jobs/tasks.py
  • ami/jobs/tests/__init__.py
  • ami/jobs/tests/test_jobs.py
  • ami/jobs/tests/test_tasks.py
  • ami/jobs/tests/test_update_stale_jobs.py

Comment thread ami/jobs/tasks.py Outdated
Comment thread ami/jobs/tasks.py Outdated
…nal branch

Two issues from code review:

- Guard FAILURE the same as SUCCESS for async_api jobs: if Celery reports
  SUCCESS or FAILURE but progress is incomplete, treat as non-terminal and
  revoke instead, matching the AsyncJobStateManager convention
- Call cleanup_async_job_if_needed() in the is_terminal branch so NATS/Redis
  resources are freed for recovered jobs, not only for revoked ones

Co-Authored-By: Claude <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/jobs/tests/test_update_stale_jobs.py (1)

81-98: Add a sibling test for FAILURE with incomplete async progress.

Line 81 validates the SUCCESS branch, but Line 358 in ami/jobs/tasks.py now guards both SUCCESS and FAILURE. Add a parallel case for states.FAILURE + incomplete progress to lock this behavior.

Based on learnings, jobs using AsyncJobStateManager should only be set to SUCCESS or FAILURE once progress is complete.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests/test_update_stale_jobs.py` around lines 81 - 98, Add a new
test mirroring test_revokes_success_with_incomplete_progress that sets
mock_async_result.return_value.state = states.FAILURE and verifies
check_stale_jobs() returns a single result with action "revoked", that the Job
status becomes JobState.REVOKED.value, and mock_cleanup was called once;
reference the existing test name
(test_revokes_success_with_incomplete_progress), the check_stale_jobs() call,
JobState and JobDispatchMode updates, and mock_async_result/mock_cleanup
fixtures so the new test covers the FAILURE + incomplete progress branch guarded
in ami/jobs/tasks.py.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/tasks.py`:
- Around line 349-352: The AsyncResult lookup for Celery (the
AsyncResult(job.task_id).state call that sets celery_state) can raise
broker/backend exceptions; wrap this lookup in a try-except around the
AsyncResult(...) access inside the loop that iterates jobs, catching broad
connection/operational exceptions (e.g., redis/kombu errors), log a warning via
the existing logger with context (job.id and job.task_id), append an error entry
to the results list indicating the lookup failed for that job, set celery_state
to None or a sentinel, and continue processing the next job so one Celery
failure doesn't abort the entire cleanup run.

---

Nitpick comments:
In `@ami/jobs/tests/test_update_stale_jobs.py`:
- Around line 81-98: Add a new test mirroring
test_revokes_success_with_incomplete_progress that sets
mock_async_result.return_value.state = states.FAILURE and verifies
check_stale_jobs() returns a single result with action "revoked", that the Job
status becomes JobState.REVOKED.value, and mock_cleanup was called once;
reference the existing test name
(test_revokes_success_with_incomplete_progress), the check_stale_jobs() call,
JobState and JobDispatchMode updates, and mock_async_result/mock_cleanup
fixtures so the new test covers the FAILURE + incomplete progress branch guarded
in ami/jobs/tasks.py.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0bf2867 and 1a733b9.

📒 Files selected for processing (2)
  • ami/jobs/tasks.py
  • ami/jobs/tests/test_update_stale_jobs.py

Comment thread ami/jobs/tasks.py Outdated
mihow and others added 2 commits March 3, 2026 00:20
Fetch stale job PKs first, then re-acquire each inside transaction.atomic()
with select_for_update(). The re-fetch re-checks running state and cutoff so
a job handled by a concurrent run is skipped (DoesNotExist) rather than
double-processed. Async resource cleanup (NATS/Redis) runs outside the
transaction to avoid holding the row lock during network calls.

Matches the pattern used by _fail_job() and _update_job_progress().

Co-Authored-By: Claude <noreply@anthropic.com>
Wrap AsyncResult(task_id).state in try-except so a single broker/backend
failure doesn't abort the entire batch. Failed lookups are logged and
the job is revoked as if Celery state were unknown.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow changed the title fix: revoke stale jobs instead of cycling through PENDING fix: revoke stale jobs to "revoked" status instead of "pending" Mar 3, 2026
@mihow mihow merged commit 8c4dcc5 into main Mar 3, 2026
7 checks passed
@mihow mihow deleted the fix/revoke-stale-jobs branch March 3, 2026 08:44
@mihow mihow added the PSv2 Async & distributed ML backend (PSv2): job state, NATS dispatch, result handling. Umbrella #515. label Jun 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

PSv2 Async & distributed ML backend (PSv2): job state, NATS dispatch, result handling. Umbrella #515.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants