diff --git a/DMOps/file_invalidation_server/controllers/cronjob_integrity_process_queue.yaml b/DMOps/file_invalidation_server/controllers/cronjob_integrity_process_queue.yaml new file mode 100644 index 00000000..1caa1e0a --- /dev/null +++ b/DMOps/file_invalidation_server/controllers/cronjob_integrity_process_queue.yaml @@ -0,0 +1,69 @@ +# CronJob: integrity-queue-processor +# Picks up SUBMITTED requests and triggers K8s jobs when capacity allows. +# MAX_CONCURRENT_JOBS controls how many jobs run simultaneously. +# Runs every minute — fast enough for reasonable throughput. +# +# Apply: kubectl apply -f controllers/cronjob_queue_processor_integrity.yaml + +apiVersion: batch/v1 +kind: CronJob +metadata: + name: integrity-queue-processor + namespace: file-invalidation-tool +spec: + schedule: "* * * * *" + concurrencyPolicy: Forbid # never run two queue processors simultaneously + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 + suspend: false + jobTemplate: + spec: + template: + spec: + restartPolicy: Never + serviceAccountName: job-runner + containers: + - name: queue-processor + image: registry.paas.cern.ch/file-invalidation-tool/file-invalidation-tool:test + imagePullPolicy: Always + command: + - python3 + - file_integrity_checker/queue_processor.py + env: + - name: FIC_MAX_CONCURRENT_JOBS + value: "3" + - name: SECRET_KEY + valueFrom: + secretKeyRef: + name: django-secret-key + key: SECRET_KEY + - name: DB_NAME + valueFrom: + secretKeyRef: + name: django-db-name + key: DB_NAME + - name: DB_HOST + valueFrom: + secretKeyRef: + name: django-db-host + key: DB_HOST + - name: DB_PORT + valueFrom: + secretKeyRef: + name: django-db-port + key: DB_PORT + - name: DB_USER + valueFrom: + secretKeyRef: + name: django-db-user + key: DB_USER + - name: DB_PASSWORD + valueFrom: + secretKeyRef: + name: django-db-password + key: DB_PASSWORD + - name: JIRA_PAT + valueFrom: + secretKeyRef: + name: jira-pat + key: JIRA_PAT \ No newline at end of file diff --git a/DMOps/file_invalidation_server/file_integrity_checker/README.md b/DMOps/file_invalidation_server/file_integrity_checker/README.md index fec1f843..dbb0d4d7 100644 --- a/DMOps/file_invalidation_server/file_integrity_checker/README.md +++ b/DMOps/file_invalidation_server/file_integrity_checker/README.md @@ -10,16 +10,19 @@ A Django application integrated into the CMS file invalidation server that allow User submits LFNs via API │ ▼ -Django creates a FileIntegrityRequest + placeholder FileReplica rows +Django creates FileIntegrityRequest (status=SUBMITTED) ++ placeholder FileReplica rows +Returns request_id immediately — user does not wait │ ▼ -LFN list written to a PVC file +queue_processor.py CronJob (every 1 min) +Checks: how many jobs IN_PROGRESS? +If below FIC_MAX_CONCURRENT_JOBS: triggers next SUBMITTED request +If at limit: skips — tries again next minute │ ▼ Kubernetes job created — runs the integrity check tool container - │ - ▼ -Tool copies each file from WLCG, validates checksum + decompression +Tool copies files from WLCG, validates checksum + decompression Prints JSON results to stdout │ ▼ @@ -60,6 +63,7 @@ file_invalidation_server/ │ ├── serializers.py — API input validation │ ├── views.py — all API views │ ├── tasks.py — job trigger logic + constants +│ ├── process_queue.py — CronJob: picks up SUBMITTED requests, triggers jobs │ ├── process_jobs.py — CronJob: polls K8s, parses results, updates DB │ ├── cleanup_jobs.py — CronJob: removes orphaned PVC files and K8s jobs │ ├── urls.py — URL routing @@ -68,6 +72,7 @@ file_invalidation_server/ │ └── controllers/ ├── job_integrity.yaml — Kubernetes Job template for the checker + ├── cronjob_queue_processor_integrity.yaml — CronJob for process_queue.py ├── cronjob_integrity_process_jobs.yaml — CronJob for process_jobs.py └── cronjob_integrity_cleanup_jobs.yaml — CronJob for cleanup_jobs.py ``` @@ -289,7 +294,8 @@ All variables use the `FIC_` prefix (File Integrity Checker). All have sensible | Variable | Default | Description | |---|---|---| -| `FIC_MAX_LFNS_PER_REQUEST` | `20` | Maximum number of LFNs allowed per request | +| `FIC_MAX_LFNS_PER_REQUEST` | `20` | Maximum number of LFNs allowed per request. | +| `FIC_MAX_CONCURRENT_JOBS` | `3` | Maximum number of integrity check jobs running simultaneously. | | `FIC_PVC_MOUNT_PATH_HOST` | `/shared-data-integrity` | Where the Django pod mounts the integrity PVC. Must match the Django deployment yaml volumeMount. | | `FIC_PVC_MOUNT_PATH_CONTAINER` | `/input` | Where the job container mounts the same PVC. Must match `mountPath` in `job_integrity.yaml`. | | `FIC_JOB_WORKDIR` | `/tmp` | Writable directory inside the job container for temporary file copies. | diff --git a/DMOps/file_invalidation_server/file_integrity_checker/process_queue.py b/DMOps/file_invalidation_server/file_integrity_checker/process_queue.py new file mode 100644 index 00000000..0d8d7ad8 --- /dev/null +++ b/DMOps/file_invalidation_server/file_integrity_checker/process_queue.py @@ -0,0 +1,74 @@ +import os +import sys +import logging + +import django +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'file_invalidation_server.settings') +django.setup() + +from file_integrity_checker.models import FileIntegrityRequest +from file_integrity_checker.tasks import trigger_job, MAX_CONCURRENT_JOBS + +logging.basicConfig( + level=logging.INFO, + format='(%(asctime)s) [%(name)s] %(levelname)s: %(message)s' +) +logger = logging.getLogger(__name__) + + +def process_queue(): + """ + Picks up SUBMITTED requests and triggers their K8s jobs + when the number of IN_PROGRESS jobs is below the limit. + + Called by a CronJob every minute. Each run: + 1. Counts currently running jobs + 2. Fills available slots with oldest SUBMITTED requests + 3. Triggers one job per available slot + """ + in_progress = FileIntegrityRequest.objects.filter( + status=FileIntegrityRequest.Status.IN_PROGRESS + ).count() + + available_slots = MAX_CONCURRENT_JOBS - in_progress + + if available_slots <= 0: + logger.info( + f"No available slots — " + f"{in_progress}/{MAX_CONCURRENT_JOBS} jobs running" + ) + return + + logger.info( + f"{in_progress}/{MAX_CONCURRENT_JOBS} jobs running — " + f"{available_slots} slot(s) available" + ) + + # Pick oldest SUBMITTED requests — FIFO order + pending = FileIntegrityRequest.objects.filter( + status=FileIntegrityRequest.Status.SUBMITTED + ).order_by('created_at')[:available_slots] + + if not pending: + logger.info("No queued requests waiting") + return + + for integrity_request in pending: + logger.info( + f"Triggering job for request {integrity_request.request_id} " + f"by {integrity_request.requested_by} " + f"({integrity_request.replicas.count()} LFNs)" + ) + job_id, job_status = trigger_job(integrity_request) + integrity_request.job_id = job_id + integrity_request.status = job_status + integrity_request.save(update_fields=['job_id', 'status', 'updated_at']) + logger.info( + f"Request {integrity_request.request_id} → " + f"status={job_status}, job_id={job_id}" + ) + + +if __name__ == "__main__": + process_queue() \ No newline at end of file diff --git a/DMOps/file_invalidation_server/file_integrity_checker/tasks.py b/DMOps/file_invalidation_server/file_integrity_checker/tasks.py index e9cbbf6e..eda41e79 100644 --- a/DMOps/file_invalidation_server/file_integrity_checker/tasks.py +++ b/DMOps/file_invalidation_server/file_integrity_checker/tasks.py @@ -9,6 +9,7 @@ logger = logging.getLogger(__name__) MAX_LFNS_PER_REQUEST = decouple_config('FIC_MAX_LFNS_PER_REQUEST', default=20, cast=int) +MAX_CONCURRENT_JOBS = decouple_config('FIC_MAX_CONCURRENT_JOBS', default=3, cast=int) # --------------------------------------------------------------------------- # Path constants — change these if mount paths or filenames change @@ -45,8 +46,6 @@ def process_integrity_check(integrity_request, raw_lfns): 1. Validates the LFN list 2. Creates one FileReplica placeholder row per LFN (rse=None, status=pending) - 3. Triggers the Kubernetes job - 4. Updates the request with job_id and status """ if not raw_lfns: raise ValueError("No LFNs provided.") @@ -73,21 +72,15 @@ def process_integrity_check(integrity_request, raw_lfns): f"Created {len(raw_lfns)} FileReplica placeholder rows " f"for request {integrity_request.request_id}" ) - - job_id, job_status = trigger_job(integrity_request) - - # update_fields must include updated_at explicitly because auto_now=True - # fields are only auto-updated when update_fields is not specified - integrity_request.job_id = job_id - integrity_request.status = job_status - integrity_request.save(update_fields=['job_id', 'status', 'updated_at']) - + + # Leave status as SUBMITTED for now. + # The queue processor will pick it up. logger.info( - f"Request {integrity_request.request_id} → " - f"status={job_status}, job_id={job_id}" + f"Request {integrity_request.request_id} queued " + f"with {len(raw_lfns)} LFNs" ) - return job_id, job_status + return None, FileIntegrityRequest.Status.SUBMITTED def trigger_job(integrity_request): diff --git a/DMOps/file_invalidation_server/file_integrity_checker/tests.py b/DMOps/file_invalidation_server/file_integrity_checker/tests.py index 5d0811be..80c598b5 100644 --- a/DMOps/file_invalidation_server/file_integrity_checker/tests.py +++ b/DMOps/file_invalidation_server/file_integrity_checker/tests.py @@ -7,6 +7,7 @@ from .tasks import process_integrity_check, split_scope, FileIntegrityRequest from .views import derive_file_status, build_request_summary, _format_lfns_per_rse from file_integrity_checker.process_jobs import parse_tool_output, update_replicas +from file_integrity_checker.process_queue import process_queue, MAX_CONCURRENT_JOBS class SplitScopeTest(TestCase): @@ -59,24 +60,22 @@ def test_replica_initial_status_is_pending(self): replica = self.request.replicas.first() self.assertEqual(replica.status, 'pending') - def test_request_status_updated_after_trigger(self): - # Locally K8s is not available so trigger_job returns FAILED. - # What we test here is that the status is always updated to - # whatever trigger_job returns — never left as SUBMITTED. + def test_request_status_updated_to_submitted(self): + # process_integrity_check queues the request — job is triggered + # by process_queue.py. Status must be SUBMITTED. process_integrity_check(self.request, ['cms:/store/data/file.root']) self.request.refresh_from_db() - self.assertNotEqual( + self.assertEqual( self.request.status, FileIntegrityRequest.Status.SUBMITTED ) - def test_job_id_always_set(self): - # job_id is generated before K8s submission so it is always - # set regardless of whether the job creation succeeds or fails + def test_job_id_not_set_at_submission(self): + # job_id is set by process_queue.py when trigger_job runs, + # not at submission time. Must be None after process_integrity_check. process_integrity_check(self.request, ['cms:/store/data/file.root']) self.request.refresh_from_db() - self.assertIsNotNone(self.request.job_id) - self.assertEqual(len(self.request.job_id), 8) + self.assertIsNone(self.request.job_id) def test_too_many_lfns_raises(self): lfns = [f'/store/data/file{i}.root' for i in range(21)] @@ -88,47 +87,100 @@ def test_empty_lfns_raises(self): process_integrity_check(self.request, []) -class TriggerJobArgsTest(TestCase): +class QueueProcessorTest(TestCase): def setUp(self): - self.request_with_rse = FileIntegrityRequest.objects.create( + # A submitted request with placeholder replicas + # (as process_integrity_check would leave it) + self.request = FileIntegrityRequest.objects.create( requested_by='testuser', rse_expression='T2_CH_CERN', full_scan=False, status=FileIntegrityRequest.Status.SUBMITTED ) - self.request_full_scan = FileIntegrityRequest.objects.create( - requested_by='testuser', - rse_expression=None, - full_scan=True, - status=FileIntegrityRequest.Status.SUBMITTED + FileReplica.objects.create( + request=self.request, + scope='cms', + lfn='/store/data/file.root', + status='pending' ) - @patch('file_integrity_checker.tasks.trigger_job') - def test_rse_expression_stored_on_request(self, mock_trigger): + @patch('file_integrity_checker.process_queue.trigger_job') + def test_submitted_request_is_triggered(self, mock_trigger): mock_trigger.return_value = ('abc12345', FileIntegrityRequest.Status.IN_PROGRESS) - process_integrity_check(self.request_with_rse, ['cms:/store/data/file.root']) - self.request_with_rse.refresh_from_db() - self.assertEqual(self.request_with_rse.rse_expression, 'T2_CH_CERN') - - @patch('file_integrity_checker.tasks.trigger_job') - def test_status_is_in_progress_when_job_succeeds(self, mock_trigger): - # By mocking trigger_job we can test the IN_PROGRESS path - # that is unreachable locally without K8s + process_queue() + self.request.refresh_from_db() + self.assertEqual(self.request.status, FileIntegrityRequest.Status.IN_PROGRESS) + self.assertEqual(self.request.job_id, 'abc12345') + + @patch('file_integrity_checker.process_queue.trigger_job') + def test_job_id_set_after_queue_processing(self, mock_trigger): mock_trigger.return_value = ('abc12345', FileIntegrityRequest.Status.IN_PROGRESS) - process_integrity_check(self.request_with_rse, ['cms:/store/data/file.root']) - self.request_with_rse.refresh_from_db() - self.assertEqual( - self.request_with_rse.status, - FileIntegrityRequest.Status.IN_PROGRESS - ) + process_queue() + self.request.refresh_from_db() + self.assertIsNotNone(self.request.job_id) + self.assertEqual(len(self.request.job_id), 8) + + @patch('file_integrity_checker.process_queue.trigger_job') + def test_rse_expression_preserved_through_queue(self, mock_trigger): + mock_trigger.return_value = ('abc12345', FileIntegrityRequest.Status.IN_PROGRESS) + process_queue() + self.request.refresh_from_db() + self.assertEqual(self.request.rse_expression, 'T2_CH_CERN') - @patch('file_integrity_checker.tasks.trigger_job') - def test_full_scan_stored_on_request(self, mock_trigger): + @patch('file_integrity_checker.process_queue.trigger_job') + def test_respects_max_concurrent_jobs_limit(self, mock_trigger): mock_trigger.return_value = ('abc12345', FileIntegrityRequest.Status.IN_PROGRESS) - process_integrity_check(self.request_full_scan, ['cms:/store/data/file.root']) - self.request_full_scan.refresh_from_db() - self.assertTrue(self.request_full_scan.full_scan) + + # Fill up all slots with IN_PROGRESS requests + for i in range(MAX_CONCURRENT_JOBS): + FileIntegrityRequest.objects.create( + requested_by='testuser', + status=FileIntegrityRequest.Status.IN_PROGRESS, + job_id=f'job{i:04d}ab' + ) + + process_queue() + + # Our SUBMITTED request should not have been triggered + self.request.refresh_from_db() + self.assertEqual(self.request.status, FileIntegrityRequest.Status.SUBMITTED) + mock_trigger.assert_not_called() + + @patch('file_integrity_checker.process_queue.trigger_job') + def test_fifo_order_respected(self, mock_trigger): + mock_trigger.return_value = ('abc12345', FileIntegrityRequest.Status.IN_PROGRESS) + + # Create a newer request — self.request is older + newer = FileIntegrityRequest.objects.create( + requested_by='testuser2', + status=FileIntegrityRequest.Status.SUBMITTED + ) + FileReplica.objects.create( + request=newer, scope='cms', + lfn='/store/data/newer.root', status='pending' + ) + + # With MAX_CONCURRENT_JOBS=3 and 0 running, both should be triggered + # but self.request (older) should be triggered first + triggered_ids = [] + def capture_trigger(req): + triggered_ids.append(req.request_id) + return ('abc12345', FileIntegrityRequest.Status.IN_PROGRESS) + mock_trigger.side_effect = capture_trigger + + process_queue() + + self.assertEqual(triggered_ids[0], self.request.request_id) + + @patch('file_integrity_checker.process_queue.trigger_job') + def test_no_submitted_requests_does_nothing(self, mock_trigger): + # Mark our request as already completed + self.request.status = FileIntegrityRequest.Status.COMPLETED + self.request.save() + + process_queue() + mock_trigger.assert_not_called() class ParseToolOutputTest(TestCase):