Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions ami/jobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
# State keys genuinely missing (the total-images key returned None).
# Ack so NATS stops redelivering and fail the job — there's no state
# left to reconcile against.
_log_missing_state_context(job_id, "process")
_ack_task_via_nats(reply_subject, logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
return
Expand Down Expand Up @@ -364,6 +365,7 @@ def process_nats_pipeline_result(self, job_id: int, result_data: dict, reply_sub
# State keys genuinely missing (total-images key returned None). Ack
# first so NATS stops redelivering a message whose state is gone,
# then fail the job. Mirrors the stage=process missing-state path.
_log_missing_state_context(job_id, "results")
_ack_task_via_nats(reply_subject, job.logger)
_fail_job(job_id, "Job state keys not found in Redis (likely cleaned up concurrently)")
return
Expand Down Expand Up @@ -434,6 +436,43 @@ def _fail_job(job_id: int, reason: str) -> None:
cleanup_async_job_resources(job_id)


def _log_missing_state_context(job_id: int, stage: str) -> None:
# Diagnostic for a missing progress-state key. Status + age separate the two
# cases: a terminal job means a late result arriving after the job finished
# (benign — e.g. a cancel already cleaned up the state); a still-running job
# with no state is the one worth investigating (state never seeded, or wiped
# by a re-dispatched run). Observation only; the fix is planned in #1337.
from django.utils import timezone

from ami.jobs.models import Job, JobState # avoid circular import

try:
row = Job.objects.values("status", "created_at").get(pk=job_id)
Comment thread
mihow marked this conversation as resolved.
Outdated
except Job.DoesNotExist:
logger.warning("Job %s: progress state missing and the job no longer exists (stage=%s).", job_id, stage)
return

age = (timezone.now() - row["created_at"]).total_seconds() if row["created_at"] else None
age_s = round(age, 1) if age is not None else None

if row["status"] in JobState.final_states():
logger.info(
"Job %s: result arrived after the job already finished (status=%s, stage=%s); ignoring.",
job_id,
row["status"],
stage,
)
else:
logger.warning(
"Job %s: progress state missing while the job is still running "
"(status=%s, stage=%s, age=%ss); marking it failed.",
job_id,
row["status"],
stage,
age_s,
)
Comment thread
mihow marked this conversation as resolved.


def _ack_task_via_nats(reply_subject: str, job_logger: logging.Logger) -> bool:
"""
Acknowledge a NATS task. Returns True only when JetStream confirmed the ack.
Expand Down Expand Up @@ -653,6 +692,15 @@ def _update_job_progress(
# Advance summary.status only when the transition fired, else an
# already-terminal job's JSONB would disagree with its column.
job.progress.summary.status = complete_state
else:
# Work completed but the guard found the job already terminal/CANCELING,
# so the completion was not applied. Usually legitimate (a cancel or the
# reaper won the race); if frequent it points to a premature terminal
# verdict. Observation only; see #1337.
logger.warning(
"Job %s: work completed but the job was already in a terminal state; completion not applied.",
job_id,
)
Comment thread
mihow marked this conversation as resolved.

# status/finished_at are deliberately NOT in this save() — only the
# guarded UPDATE above writes them. Folding them back in reopens #1337.
Expand Down
Loading