diff --git a/.github/workflows/build_and_push.yml b/.github/workflows/build_and_push.yml index 92b1888..bb3a81c 100644 --- a/.github/workflows/build_and_push.yml +++ b/.github/workflows/build_and_push.yml @@ -1,9 +1,9 @@ name: Build and Push Docker Images -on: - push: - branches: - - main +on: push +# push: +# branches: +# - main env: REGISTRY: ghcr.io diff --git a/job_creator/jobcreator/job_creator.py b/job_creator/jobcreator/job_creator.py index d8424d7..1cd0017 100644 --- a/job_creator/jobcreator/job_creator.py +++ b/job_creator/jobcreator/job_creator.py @@ -267,6 +267,11 @@ def spawn_job( # noqa: PLR0913 special_pvs: list[str], taints: list[dict[str, Any]], affinity: dict[str, Any] | None, + queue_host: str, + queue_user: str, + queue_password: str, + failure_queue_name: str, + filepath: str | None = None, ) -> None: """ Takes the meta_data from the message and uses that dictionary for generating the deployment of the pod. @@ -430,6 +435,10 @@ def spawn_job( # noqa: PLR0913 client.V1EnvVar(name="CONTAINER_NAME", value=job_name), client.V1EnvVar(name="JOB_NAME", value=job_name), client.V1EnvVar(name="POD_NAME", value=job_name), + client.V1EnvVar(name="QUEUE_HOST", value=queue_host), + client.V1EnvVar(name="QUEUE_USER", value=queue_user), + client.V1EnvVar(name="QUEUE_PASSWORD", value=queue_password), + client.V1EnvVar(name="FAILURE_QUEUE_NAME", value=failure_queue_name), ], ) @@ -467,6 +476,8 @@ def spawn_job( # noqa: PLR0913 "kubectl.kubernetes.io/default-container": main_container.name, }, ) + if filepath: + job_metadata.annotations["filepath"] = filepath job = client.V1Job( api_version="batch/v1", diff --git a/job_creator/jobcreator/main.py b/job_creator/jobcreator/main.py index caeca17..f36e619 100644 --- a/job_creator/jobcreator/main.py +++ b/job_creator/jobcreator/main.py @@ -44,6 +44,7 @@ QUEUE_NAME = os.environ.get("INGRESS_QUEUE_NAME", "") CONSUMER_USERNAME = os.environ.get("QUEUE_USER", "") CONSUMER_PASSWORD = os.environ.get("QUEUE_PASSWORD", "") +FAILURE_QUEUE_NAME = os.environ.get("FAILURE_QUEUE_NAME", "failed-watched-files") REDUCE_USER_ID = os.environ.get("REDUCE_USER_ID", "") JOB_NAMESPACE = os.environ.get("JOB_NAMESPACE", "fia") JOB_CREATOR = JobCreator(dev_mode=DEV_MODE, watcher_sha=WATCHER_SHA) @@ -165,6 +166,10 @@ def process_simple_message(message: dict[str, Any]) -> None: special_pvs=[], taints=taints, affinity=affinity, + queue_host=QUEUE_HOST, + queue_user=CONSUMER_USERNAME, + queue_password=CONSUMER_PASSWORD, + failure_queue_name=FAILURE_QUEUE_NAME, ) except Exception as exception: logger.exception(exception) @@ -209,6 +214,10 @@ def process_rerun_message(message: dict[str, Any]) -> None: special_pvs=special_pvs, taints=taints, affinity=affinity, + queue_host=QUEUE_HOST, + queue_user=CONSUMER_USERNAME, + queue_password=CONSUMER_PASSWORD, + failure_queue_name=FAILURE_QUEUE_NAME, ) except Exception as exception: logger.exception(exception) @@ -272,6 +281,11 @@ def process_autoreduction_message(message: dict[str, Any]) -> None: special_pvs=special_pvs, taints=taints, affinity=affinity, + queue_host=QUEUE_HOST, + queue_user=CONSUMER_USERNAME, + queue_password=CONSUMER_PASSWORD, + failure_queue_name=FAILURE_QUEUE_NAME, + filepath=message.get("filepath"), ) except Exception as exception: logger.exception(exception) diff --git a/job_creator/test/test_job_creator.py b/job_creator/test/test_job_creator.py index c5a9397..01b362b 100644 --- a/job_creator/test/test_job_creator.py +++ b/job_creator/test/test_job_creator.py @@ -255,6 +255,11 @@ def test_jobcreator_spawn_job_dev_mode_true( special_pvs = mock.MagicMock() taints = mock.MagicMock() affinity = {"key": "node-type", "operator": "In", "values": ["gpu-worker"]} + queue_host = mock.MagicMock() + queue_user = mock.MagicMock() + queue_password = mock.MagicMock() + failure_queue_name = mock.MagicMock() + filepath = mock.MagicMock() job_creator.spawn_job( job_name, @@ -275,6 +280,11 @@ def test_jobcreator_spawn_job_dev_mode_true( special_pvs, taints, affinity, + queue_host, + queue_user, + queue_password, + failure_queue_name, + filepath, ) assert client.BatchV1Api.return_value.create_namespaced_job.call_args.kwargs["namespace"] == job_namespace assert client.BatchV1Api.return_value.create_namespaced_job.call_args.kwargs["body"] == client.V1Job.return_value @@ -370,6 +380,10 @@ def test_jobcreator_spawn_job_dev_mode_true( client.V1EnvVar(name="CONTAINER_NAME", value=job_name), client.V1EnvVar(name="JOB_NAME", value=job_name), client.V1EnvVar(name="POD_NAME", value=job_name), + client.V1EnvVar(name="QUEUE_HOST", value=queue_host), + client.V1EnvVar(name="QUEUE_USER", value=queue_user), + client.V1EnvVar(name="QUEUE_PASSWORD", value=queue_password), + client.V1EnvVar(name="FAILURE_QUEUE_NAME", value=failure_queue_name), ], ) in client.V1Container.call_args_list @@ -437,6 +451,11 @@ def test_jobcreator_spawn_job_dev_mode_true_imat( special_pvs = ["imat"] taints = [{"key": "nvidia.com/gpu", "effect": "NoSchedule", "operator": "Exists"}] affinity = {"key": "node-type", "operator": "In", "values": ["gpu-worker"]} + queue_host = mock.MagicMock() + queue_user = mock.MagicMock() + queue_password = mock.MagicMock() + failure_queue_name = mock.MagicMock() + filepath = mock.MagicMock() job_creator.spawn_job( job_name, @@ -457,6 +476,11 @@ def test_jobcreator_spawn_job_dev_mode_true_imat( special_pvs, taints, affinity, + queue_host, + queue_user, + queue_password, + failure_queue_name, + filepath, ) assert client.BatchV1Api.return_value.create_namespaced_job.call_args.kwargs["namespace"] == job_namespace @@ -573,6 +597,10 @@ def test_jobcreator_spawn_job_dev_mode_true_imat( client.V1EnvVar(name="CONTAINER_NAME", value=job_name), client.V1EnvVar(name="JOB_NAME", value=job_name), client.V1EnvVar(name="POD_NAME", value=job_name), + client.V1EnvVar(name="QUEUE_HOST", value=queue_host), + client.V1EnvVar(name="QUEUE_USER", value=queue_user), + client.V1EnvVar(name="QUEUE_PASSWORD", value=queue_password), + client.V1EnvVar(name="FAILURE_QUEUE_NAME", value=failure_queue_name), ], ) in client.V1Container.call_args_list @@ -640,6 +668,12 @@ def test_jobcreator_spawn_job_dev_mode_false( special_pvs = mock.MagicMock() taints = mock.MagicMock() affinity = mock.MagicMock() + queue_host = mock.MagicMock() + queue_user = mock.MagicMock() + queue_password = mock.MagicMock() + failure_queue_name = mock.MagicMock() + filepath = mock.MagicMock() + job_creator.spawn_job( job_name, script, @@ -659,6 +693,11 @@ def test_jobcreator_spawn_job_dev_mode_false( special_pvs, taints, affinity, + queue_host, + queue_user, + queue_password, + failure_queue_name, + filepath, ) assert ( diff --git a/job_creator/test/test_queue_consumer.py b/job_creator/test/test_queue_consumer.py index fe772f9..5f22b59 100644 --- a/job_creator/test/test_queue_consumer.py +++ b/job_creator/test/test_queue_consumer.py @@ -12,7 +12,7 @@ QUEUE_NAME = mock.MagicMock() -@pytest.fixture(autouse=True, scope="module") +@pytest.fixture(autouse=True) def setup_queue_consumer(): with ( mock.patch("jobcreator.queue_consumer.ConnectionParameters") as connection_parameters, diff --git a/job_watcher/jobwatcher/job_watcher.py b/job_watcher/jobwatcher/job_watcher.py index 8ec601f..0557016 100644 --- a/job_watcher/jobwatcher/job_watcher.py +++ b/job_watcher/jobwatcher/job_watcher.py @@ -12,6 +12,7 @@ from time import sleep from typing import Any, Literal, cast +import pika # type: ignore[import-untyped] import requests from kubernetes import client # type: ignore[import-untyped] from kubernetes.client import V1ContainerStatus, V1Job, V1Pod # type: ignore[import-untyped] @@ -21,6 +22,10 @@ StateString = Literal["SUCCESSFUL", "UNSUCCESSFUL", "ERROR", "NOT_STARTED"] FIA_API_HOST = os.environ.get("FIA_API", "fia-api-service.fia.svc.cluster.local:80") FIA_API_API_KEY = os.environ.get("FIA_API_API_KEY") +QUEUE_HOST = os.environ.get("QUEUE_HOST", "localhost") +QUEUE_USER = os.environ.get("QUEUE_USER", "guest") +QUEUE_PASSWORD = os.environ.get("QUEUE_PASSWORD", "guest") +FAILURE_QUEUE_NAME = os.environ.get("FAILURE_QUEUE_NAME", "failed-watched-files") def clean_up_pvcs_for_job(job: V1Job, namespace: str) -> None: @@ -185,9 +190,35 @@ def check_for_changes(self) -> None: self.done_watching = True elif self.check_for_pod_stalled(): logger.info("Job has stalled out...") + self.resubmit_job() self.cleanup_job() self.done_watching = True + def resubmit_job(self) -> None: + """ + Resubmit the job by pushing its filepath onto the failure queue if it exists. + """ + if self.job is None: + raise AttributeError("Job must be set in the JobWatcher before calling this function.") + filepath = self.job.metadata.annotations.get("filepath") + if not filepath: + logger.info("No filepath annotation found, skipping resubmission.") + return + + logger.info("Resubmitting job via failure queue with filepath: %s", filepath) + credentials = pika.PlainCredentials(username=QUEUE_USER, password=QUEUE_PASSWORD) + connection_parameters = pika.ConnectionParameters(QUEUE_HOST, 5672, credentials=credentials) + try: + connection = pika.BlockingConnection(connection_parameters) + channel = connection.channel() + channel.queue_declare(FAILURE_QUEUE_NAME, durable=True, arguments={"x-queue-type": "quorum"}) + channel.basic_publish(exchange="", routing_key=FAILURE_QUEUE_NAME, body=filepath.encode()) + connection.close() + logger.info("Successfully published filepath to failure queue") + except Exception as exception: + logger.error("Failed to publish to failure queue: %s", str(exception)) + logger.exception(exception) + def get_container_status(self) -> V1ContainerStatus | None: """ Get and return the current container status, ignoring the job watcher's container @@ -225,7 +256,7 @@ def check_for_job_complete(self) -> bool: def check_for_pod_stalled(self) -> bool: """ The way this checks if a job is stalled is by checking if there has been no new - logs for the last 30 minutes, or if the job has taken over 6 hours to complete. + logs for the last 60 minutes, or if the job has taken over 6 hours to complete. Long term 6 hours may be too little so this is configurable using the environment variables. :return: bool, True if pod is stalled, False if pod is not stalled. @@ -233,21 +264,21 @@ def check_for_pod_stalled(self) -> bool: if self.pod is None: raise AttributeError("Pod must be set in the JobWatcher before calling this function.") v1_core = client.CoreV1Api() - seconds_in_30_minutes = 60 * 30 - # If pod is younger than 30 minutes it can't be stalled for 30 minutes, if older, then check. + seconds_in_60_minutes = 60 * 60 + # If pod is younger than 60 minutes it can't be stalled for 60 minutes, if older, then check. if (datetime.datetime.now(datetime.UTC) - self.pod.metadata.creation_timestamp) > datetime.timedelta( - seconds=seconds_in_30_minutes + seconds=seconds_in_60_minutes ): logs = v1_core.read_namespaced_pod_log( name=self.pod.metadata.name, namespace=self.pod.metadata.namespace, timestamps=True, tail_lines=1, - since_seconds=seconds_in_30_minutes, + since_seconds=seconds_in_60_minutes, container=self.container_name, ) if logs == "": - logger.info("No new logs for pod %s in %s seconds", self.pod.metadata.name, seconds_in_30_minutes) + logger.info("No new logs for pod %s in %s seconds", self.pod.metadata.name, seconds_in_60_minutes) return True if (datetime.datetime.now(datetime.UTC) - self.pod.metadata.creation_timestamp) > datetime.timedelta( seconds=self.max_time_to_complete diff --git a/job_watcher/pyproject.toml b/job_watcher/pyproject.toml index aab010a..aaba425 100644 --- a/job_watcher/pyproject.toml +++ b/job_watcher/pyproject.toml @@ -5,6 +5,7 @@ version = "0.0.1" requires-python = ">= 3.11" dependencies = [ "kubernetes==35.0.0", + "pika==1.3.2", ] [project.urls] diff --git a/job_watcher/test/test_job_watcher.py b/job_watcher/test/test_job_watcher.py index 42af87a..9911824 100644 --- a/job_watcher/test/test_job_watcher.py +++ b/job_watcher/test/test_job_watcher.py @@ -223,6 +223,7 @@ def test_check_for_changes_job_complete_and_stalled(job_watcher_maker): jw, _client, _find_pod_from_partial_name = job_watcher_maker jw.update_current_container_info = mock.MagicMock() jw.cleanup_job = mock.MagicMock() + jw.resubmit_job = mock.MagicMock() jw.check_for_job_complete = mock.MagicMock(return_value=True) jw.check_for_pod_stalled = mock.MagicMock(return_value=True) jw.done_watching = False @@ -231,6 +232,7 @@ def test_check_for_changes_job_complete_and_stalled(job_watcher_maker): jw.update_current_container_info.assert_called_once_with() jw.cleanup_job.assert_called_once_with() + jw.resubmit_job.assert_not_called() jw.check_for_job_complete.assert_called_once_with() jw.check_for_pod_stalled.assert_not_called() assert jw.done_watching is True @@ -241,6 +243,7 @@ def test_check_for_changes_job_incomplete_and_stalled(job_watcher_maker): jw, _client, _find_pod_from_partial_name = job_watcher_maker jw.update_current_container_info = mock.MagicMock() jw.cleanup_job = mock.MagicMock() + jw.resubmit_job = mock.MagicMock() jw.check_for_job_complete = mock.MagicMock(return_value=False) jw.check_for_pod_stalled = mock.MagicMock(return_value=True) jw.done_watching = False @@ -248,6 +251,7 @@ def test_check_for_changes_job_incomplete_and_stalled(job_watcher_maker): jw.check_for_changes() jw.update_current_container_info.assert_called_once_with() + jw.resubmit_job.assert_called_once_with() jw.cleanup_job.assert_called_once_with() jw.check_for_job_complete.assert_called_once_with() jw.check_for_pod_stalled.assert_called_once_with() @@ -347,7 +351,7 @@ def test_check_for_job_complete_container_status_running(job_watcher_maker): @pytest.mark.usefixtures("job_watcher_maker") -def test_check_for_pod_stalled_pod_is_younger_than_30_minutes(job_watcher_maker): +def test_check_for_pod_stalled_pod_is_younger_than_60_minutes(job_watcher_maker): jw, _client, _find_pod_from_partial_name = job_watcher_maker jw.pod.metadata.creation_timestamp = datetime.now(UTC) jw.max_time_to_complete = 99999999 @@ -365,9 +369,9 @@ def test_check_for_pod_stalled_pod_where_pod_is_none(job_watcher_maker): @pytest.mark.usefixtures("job_watcher_maker") -def test_check_for_pod_stalled_pod_is_stalled_for_30_minutes(job_watcher_maker): +def test_check_for_pod_stalled_pod_is_stalled_for_60_minutes(job_watcher_maker): jw, _, __ = job_watcher_maker - jw.pod.metadata.creation_timestamp = datetime.now(UTC) - timedelta(seconds=60 * 35) + jw.pod.metadata.creation_timestamp = datetime.now(UTC) - timedelta(seconds=60 * 65) jw.max_time_to_complete = 99999999 with mock.patch("jobwatcher.job_watcher.client") as client: @@ -379,7 +383,7 @@ def test_check_for_pod_stalled_pod_is_stalled_for_30_minutes(job_watcher_maker): namespace=jw.pod.metadata.namespace, timestamps=True, tail_lines=1, - since_seconds=60 * 30, + since_seconds=60 * 60, container=jw.container_name, ) @@ -850,3 +854,65 @@ def test_update_job_status_retry_on_request_exception(n_exceptions: int) -> None assert mock_sleep.call_count == n_exceptions assert mock_logger_warning.call_count == n_exceptions mock_sleep.assert_has_calls([call(5)] * n_exceptions) + + +@pytest.mark.usefixtures("job_watcher_maker") +def test_resubmit_job_success(job_watcher_maker): + jw, _, __ = job_watcher_maker + filepath = "/some/file/path" + jw.job.metadata.annotations = {"filepath": filepath} + with patch("jobwatcher.job_watcher.pika") as mock_pika: + jw.resubmit_job() + + mock_pika.PlainCredentials.assert_called_once() + mock_pika.ConnectionParameters.assert_called_once() + mock_pika.BlockingConnection.assert_called_once() + connection = mock_pika.BlockingConnection.return_value + connection.channel.assert_called_once() + channel = connection.channel.return_value + channel.queue_declare.assert_called_once_with( + jw.FAILURE_QUEUE_NAME if hasattr(jw, "FAILURE_QUEUE_NAME") else "failed-watched-files", + durable=True, + arguments={"x-queue-type": "quorum"}, + ) + channel.basic_publish.assert_called_once_with( + exchange="", + routing_key=jw.FAILURE_QUEUE_NAME if hasattr(jw, "FAILURE_QUEUE_NAME") else "failed-watched-files", + body=filepath.encode(), + ) + connection.close.assert_called_once() + + +@pytest.mark.usefixtures("job_watcher_maker") +def test_resubmit_job_no_filepath(job_watcher_maker): + jw, _, __ = job_watcher_maker + jw.job.metadata.annotations = {} + with patch("jobwatcher.job_watcher.pika") as mock_pika: + jw.resubmit_job() + + mock_pika.PlainCredentials.assert_not_called() + + +@pytest.mark.usefixtures("job_watcher_maker") +def test_resubmit_job_exception(job_watcher_maker): + jw, _, __ = job_watcher_maker + filepath = "/some/file/path" + jw.job.metadata.annotations = {"filepath": filepath} + with ( + patch("jobwatcher.job_watcher.pika") as mock_pika, + patch("jobwatcher.job_watcher.logger.error") as mock_logger_error, + patch("jobwatcher.job_watcher.logger.exception") as mock_logger_exception, + ): + mock_pika.BlockingConnection.side_effect = Exception("Test Exception") + jw.resubmit_job() + + mock_logger_error.assert_called_once() + mock_logger_exception.assert_called_once() + + +@pytest.mark.usefixtures("job_watcher_maker") +def test_resubmit_job_job_is_none(job_watcher_maker): + jw, _, __ = job_watcher_maker + jw.job = None + with pytest.raises(AttributeError, match=r"Job must be set in the JobWatcher before calling this function\."): + jw.resubmit_job()