Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# CronJob: integrity-queue-processor
# Picks up SUBMITTED requests and triggers K8s jobs when capacity allows.
# MAX_CONCURRENT_JOBS controls how many jobs run simultaneously.
# Runs every minute — fast enough for reasonable throughput.
#
# Apply: kubectl apply -f controllers/cronjob_queue_processor_integrity.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
name: integrity-queue-processor
namespace: file-invalidation-tool
spec:
schedule: "* * * * *"
concurrencyPolicy: Forbid # never run two queue processors simultaneously
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 1
suspend: false
jobTemplate:
spec:
template:
spec:
restartPolicy: Never
serviceAccountName: job-runner
containers:
- name: queue-processor
image: registry.paas.cern.ch/file-invalidation-tool/file-invalidation-tool:test
imagePullPolicy: Always
command:
- python3
- file_integrity_checker/queue_processor.py
env:
- name: FIC_MAX_CONCURRENT_JOBS
value: "3"
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: django-secret-key
key: SECRET_KEY
- name: DB_NAME
valueFrom:
secretKeyRef:
name: django-db-name
key: DB_NAME
- name: DB_HOST
valueFrom:
secretKeyRef:
name: django-db-host
key: DB_HOST
- name: DB_PORT
valueFrom:
secretKeyRef:
name: django-db-port
key: DB_PORT
- name: DB_USER
valueFrom:
secretKeyRef:
name: django-db-user
key: DB_USER
- name: DB_PASSWORD
valueFrom:
secretKeyRef:
name: django-db-password
key: DB_PASSWORD
- name: JIRA_PAT
valueFrom:
secretKeyRef:
name: jira-pat
key: JIRA_PAT
18 changes: 12 additions & 6 deletions DMOps/file_invalidation_server/file_integrity_checker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ A Django application integrated into the CMS file invalidation server that allow
User submits LFNs via API
Django creates a FileIntegrityRequest + placeholder FileReplica rows
Django creates FileIntegrityRequest (status=SUBMITTED)
+ placeholder FileReplica rows
Returns request_id immediately — user does not wait
LFN list written to a PVC file
queue_processor.py CronJob (every 1 min)
Checks: how many jobs IN_PROGRESS?
If below FIC_MAX_CONCURRENT_JOBS: triggers next SUBMITTED request
If at limit: skips — tries again next minute
Kubernetes job created — runs the integrity check tool container
Tool copies each file from WLCG, validates checksum + decompression
Tool copies files from WLCG, validates checksum + decompression
Prints JSON results to stdout
Expand Down Expand Up @@ -60,6 +63,7 @@ file_invalidation_server/
│ ├── serializers.py — API input validation
│ ├── views.py — all API views
│ ├── tasks.py — job trigger logic + constants
│ ├── process_queue.py — CronJob: picks up SUBMITTED requests, triggers jobs
│ ├── process_jobs.py — CronJob: polls K8s, parses results, updates DB
│ ├── cleanup_jobs.py — CronJob: removes orphaned PVC files and K8s jobs
│ ├── urls.py — URL routing
Expand All @@ -68,6 +72,7 @@ file_invalidation_server/
└── controllers/
├── job_integrity.yaml — Kubernetes Job template for the checker
├── cronjob_queue_processor_integrity.yaml — CronJob for process_queue.py
├── cronjob_integrity_process_jobs.yaml — CronJob for process_jobs.py
└── cronjob_integrity_cleanup_jobs.yaml — CronJob for cleanup_jobs.py
```
Expand Down Expand Up @@ -289,7 +294,8 @@ All variables use the `FIC_` prefix (File Integrity Checker). All have sensible

| Variable | Default | Description |
|---|---|---|
| `FIC_MAX_LFNS_PER_REQUEST` | `20` | Maximum number of LFNs allowed per request |
| `FIC_MAX_LFNS_PER_REQUEST` | `20` | Maximum number of LFNs allowed per request. |
| `FIC_MAX_CONCURRENT_JOBS` | `3` | Maximum number of integrity check jobs running simultaneously. |
| `FIC_PVC_MOUNT_PATH_HOST` | `/shared-data-integrity` | Where the Django pod mounts the integrity PVC. Must match the Django deployment yaml volumeMount. |
| `FIC_PVC_MOUNT_PATH_CONTAINER` | `/input` | Where the job container mounts the same PVC. Must match `mountPath` in `job_integrity.yaml`. |
| `FIC_JOB_WORKDIR` | `/tmp` | Writable directory inside the job container for temporary file copies. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os
import sys
import logging

import django
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'file_invalidation_server.settings')
django.setup()

from file_integrity_checker.models import FileIntegrityRequest
from file_integrity_checker.tasks import trigger_job, MAX_CONCURRENT_JOBS

logging.basicConfig(
level=logging.INFO,
format='(%(asctime)s) [%(name)s] %(levelname)s: %(message)s'
)
logger = logging.getLogger(__name__)


def process_queue():
"""
Picks up SUBMITTED requests and triggers their K8s jobs
when the number of IN_PROGRESS jobs is below the limit.

Called by a CronJob every minute. Each run:
1. Counts currently running jobs
2. Fills available slots with oldest SUBMITTED requests
3. Triggers one job per available slot
"""
in_progress = FileIntegrityRequest.objects.filter(
status=FileIntegrityRequest.Status.IN_PROGRESS
).count()

available_slots = MAX_CONCURRENT_JOBS - in_progress

if available_slots <= 0:
logger.info(
f"No available slots — "
f"{in_progress}/{MAX_CONCURRENT_JOBS} jobs running"
)
return

logger.info(
f"{in_progress}/{MAX_CONCURRENT_JOBS} jobs running — "
f"{available_slots} slot(s) available"
)

# Pick oldest SUBMITTED requests — FIFO order
pending = FileIntegrityRequest.objects.filter(
status=FileIntegrityRequest.Status.SUBMITTED
).order_by('created_at')[:available_slots]

if not pending:
logger.info("No queued requests waiting")
return

for integrity_request in pending:
logger.info(
f"Triggering job for request {integrity_request.request_id} "
f"by {integrity_request.requested_by} "
f"({integrity_request.replicas.count()} LFNs)"
)
job_id, job_status = trigger_job(integrity_request)
integrity_request.job_id = job_id
integrity_request.status = job_status
integrity_request.save(update_fields=['job_id', 'status', 'updated_at'])
logger.info(
f"Request {integrity_request.request_id} → "
f"status={job_status}, job_id={job_id}"
)


if __name__ == "__main__":
process_queue()
21 changes: 7 additions & 14 deletions DMOps/file_invalidation_server/file_integrity_checker/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
logger = logging.getLogger(__name__)

MAX_LFNS_PER_REQUEST = decouple_config('FIC_MAX_LFNS_PER_REQUEST', default=20, cast=int)
MAX_CONCURRENT_JOBS = decouple_config('FIC_MAX_CONCURRENT_JOBS', default=3, cast=int)

# ---------------------------------------------------------------------------
# Path constants — change these if mount paths or filenames change
Expand Down Expand Up @@ -45,8 +46,6 @@ def process_integrity_check(integrity_request, raw_lfns):

1. Validates the LFN list
2. Creates one FileReplica placeholder row per LFN (rse=None, status=pending)
3. Triggers the Kubernetes job
4. Updates the request with job_id and status
"""
if not raw_lfns:
raise ValueError("No LFNs provided.")
Expand All @@ -73,21 +72,15 @@ def process_integrity_check(integrity_request, raw_lfns):
f"Created {len(raw_lfns)} FileReplica placeholder rows "
f"for request {integrity_request.request_id}"
)

job_id, job_status = trigger_job(integrity_request)

# update_fields must include updated_at explicitly because auto_now=True
# fields are only auto-updated when update_fields is not specified
integrity_request.job_id = job_id
integrity_request.status = job_status
integrity_request.save(update_fields=['job_id', 'status', 'updated_at'])


# Leave status as SUBMITTED for now.
# The queue processor will pick it up.
logger.info(
f"Request {integrity_request.request_id} "
f"status={job_status}, job_id={job_id}"
f"Request {integrity_request.request_id} queued "
f"with {len(raw_lfns)} LFNs"
)

return job_id, job_status
return None, FileIntegrityRequest.Status.SUBMITTED


def trigger_job(integrity_request):
Expand Down
Loading