Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/build_and_push.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: Build and Push Docker Images

on:
push:
branches:
- main
on: push
# push:
# branches:
# - main

env:
REGISTRY: ghcr.io
Expand Down
11 changes: 11 additions & 0 deletions job_creator/jobcreator/job_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ def spawn_job( # noqa: PLR0913
special_pvs: list[str],
taints: list[dict[str, Any]],
affinity: dict[str, Any] | None,
queue_host: str,
queue_user: str,
queue_password: str,
failure_queue_name: str,
filepath: str | None = None,
) -> None:
"""
Takes the meta_data from the message and uses that dictionary for generating the deployment of the pod.
Expand Down Expand Up @@ -430,6 +435,10 @@ def spawn_job( # noqa: PLR0913
client.V1EnvVar(name="CONTAINER_NAME", value=job_name),
client.V1EnvVar(name="JOB_NAME", value=job_name),
client.V1EnvVar(name="POD_NAME", value=job_name),
client.V1EnvVar(name="QUEUE_HOST", value=queue_host),
client.V1EnvVar(name="QUEUE_USER", value=queue_user),
client.V1EnvVar(name="QUEUE_PASSWORD", value=queue_password),
client.V1EnvVar(name="FAILURE_QUEUE_NAME", value=failure_queue_name),
],
)

Expand Down Expand Up @@ -467,6 +476,8 @@ def spawn_job( # noqa: PLR0913
"kubectl.kubernetes.io/default-container": main_container.name,
},
)
if filepath:
job_metadata.annotations["filepath"] = filepath

job = client.V1Job(
api_version="batch/v1",
Expand Down
14 changes: 14 additions & 0 deletions job_creator/jobcreator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
QUEUE_NAME = os.environ.get("INGRESS_QUEUE_NAME", "")
CONSUMER_USERNAME = os.environ.get("QUEUE_USER", "")
CONSUMER_PASSWORD = os.environ.get("QUEUE_PASSWORD", "")
FAILURE_QUEUE_NAME = os.environ.get("FAILURE_QUEUE_NAME", "failed-watched-files")
REDUCE_USER_ID = os.environ.get("REDUCE_USER_ID", "")
JOB_NAMESPACE = os.environ.get("JOB_NAMESPACE", "fia")
JOB_CREATOR = JobCreator(dev_mode=DEV_MODE, watcher_sha=WATCHER_SHA)
Expand Down Expand Up @@ -165,6 +166,10 @@ def process_simple_message(message: dict[str, Any]) -> None:
special_pvs=[],
taints=taints,
affinity=affinity,
queue_host=QUEUE_HOST,
queue_user=CONSUMER_USERNAME,
queue_password=CONSUMER_PASSWORD,
failure_queue_name=FAILURE_QUEUE_NAME,
)
except Exception as exception:
logger.exception(exception)
Expand Down Expand Up @@ -209,6 +214,10 @@ def process_rerun_message(message: dict[str, Any]) -> None:
special_pvs=special_pvs,
taints=taints,
affinity=affinity,
queue_host=QUEUE_HOST,
queue_user=CONSUMER_USERNAME,
queue_password=CONSUMER_PASSWORD,
failure_queue_name=FAILURE_QUEUE_NAME,
)
except Exception as exception:
logger.exception(exception)
Expand Down Expand Up @@ -272,6 +281,11 @@ def process_autoreduction_message(message: dict[str, Any]) -> None:
special_pvs=special_pvs,
taints=taints,
affinity=affinity,
queue_host=QUEUE_HOST,
queue_user=CONSUMER_USERNAME,
queue_password=CONSUMER_PASSWORD,
failure_queue_name=FAILURE_QUEUE_NAME,
filepath=message.get("filepath"),
)
except Exception as exception:
logger.exception(exception)
Expand Down
39 changes: 39 additions & 0 deletions job_creator/test/test_job_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ def test_jobcreator_spawn_job_dev_mode_true(
special_pvs = mock.MagicMock()
taints = mock.MagicMock()
affinity = {"key": "node-type", "operator": "In", "values": ["gpu-worker"]}
queue_host = mock.MagicMock()
queue_user = mock.MagicMock()
queue_password = mock.MagicMock()
failure_queue_name = mock.MagicMock()
filepath = mock.MagicMock()

job_creator.spawn_job(
job_name,
Expand All @@ -275,6 +280,11 @@ def test_jobcreator_spawn_job_dev_mode_true(
special_pvs,
taints,
affinity,
queue_host,
queue_user,
queue_password,
failure_queue_name,
filepath,
)
assert client.BatchV1Api.return_value.create_namespaced_job.call_args.kwargs["namespace"] == job_namespace
assert client.BatchV1Api.return_value.create_namespaced_job.call_args.kwargs["body"] == client.V1Job.return_value
Expand Down Expand Up @@ -370,6 +380,10 @@ def test_jobcreator_spawn_job_dev_mode_true(
client.V1EnvVar(name="CONTAINER_NAME", value=job_name),
client.V1EnvVar(name="JOB_NAME", value=job_name),
client.V1EnvVar(name="POD_NAME", value=job_name),
client.V1EnvVar(name="QUEUE_HOST", value=queue_host),
client.V1EnvVar(name="QUEUE_USER", value=queue_user),
client.V1EnvVar(name="QUEUE_PASSWORD", value=queue_password),
client.V1EnvVar(name="FAILURE_QUEUE_NAME", value=failure_queue_name),
],
)
in client.V1Container.call_args_list
Expand Down Expand Up @@ -437,6 +451,11 @@ def test_jobcreator_spawn_job_dev_mode_true_imat(
special_pvs = ["imat"]
taints = [{"key": "nvidia.com/gpu", "effect": "NoSchedule", "operator": "Exists"}]
affinity = {"key": "node-type", "operator": "In", "values": ["gpu-worker"]}
queue_host = mock.MagicMock()
queue_user = mock.MagicMock()
queue_password = mock.MagicMock()
failure_queue_name = mock.MagicMock()
filepath = mock.MagicMock()

job_creator.spawn_job(
job_name,
Expand All @@ -457,6 +476,11 @@ def test_jobcreator_spawn_job_dev_mode_true_imat(
special_pvs,
taints,
affinity,
queue_host,
queue_user,
queue_password,
failure_queue_name,
filepath,
)

assert client.BatchV1Api.return_value.create_namespaced_job.call_args.kwargs["namespace"] == job_namespace
Expand Down Expand Up @@ -573,6 +597,10 @@ def test_jobcreator_spawn_job_dev_mode_true_imat(
client.V1EnvVar(name="CONTAINER_NAME", value=job_name),
client.V1EnvVar(name="JOB_NAME", value=job_name),
client.V1EnvVar(name="POD_NAME", value=job_name),
client.V1EnvVar(name="QUEUE_HOST", value=queue_host),
client.V1EnvVar(name="QUEUE_USER", value=queue_user),
client.V1EnvVar(name="QUEUE_PASSWORD", value=queue_password),
client.V1EnvVar(name="FAILURE_QUEUE_NAME", value=failure_queue_name),
],
)
in client.V1Container.call_args_list
Expand Down Expand Up @@ -640,6 +668,12 @@ def test_jobcreator_spawn_job_dev_mode_false(
special_pvs = mock.MagicMock()
taints = mock.MagicMock()
affinity = mock.MagicMock()
queue_host = mock.MagicMock()
queue_user = mock.MagicMock()
queue_password = mock.MagicMock()
failure_queue_name = mock.MagicMock()
filepath = mock.MagicMock()

job_creator.spawn_job(
job_name,
script,
Expand All @@ -659,6 +693,11 @@ def test_jobcreator_spawn_job_dev_mode_false(
special_pvs,
taints,
affinity,
queue_host,
queue_user,
queue_password,
failure_queue_name,
filepath,
)

assert (
Expand Down
2 changes: 1 addition & 1 deletion job_creator/test/test_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
QUEUE_NAME = mock.MagicMock()


@pytest.fixture(autouse=True, scope="module")
@pytest.fixture(autouse=True)
def setup_queue_consumer():
with (
mock.patch("jobcreator.queue_consumer.ConnectionParameters") as connection_parameters,
Expand Down
43 changes: 37 additions & 6 deletions job_watcher/jobwatcher/job_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from time import sleep
from typing import Any, Literal, cast

import pika # type: ignore[import-untyped]
import requests
from kubernetes import client # type: ignore[import-untyped]
from kubernetes.client import V1ContainerStatus, V1Job, V1Pod # type: ignore[import-untyped]
Expand All @@ -21,6 +22,10 @@
StateString = Literal["SUCCESSFUL", "UNSUCCESSFUL", "ERROR", "NOT_STARTED"]
FIA_API_HOST = os.environ.get("FIA_API", "fia-api-service.fia.svc.cluster.local:80")
FIA_API_API_KEY = os.environ.get("FIA_API_API_KEY")
QUEUE_HOST = os.environ.get("QUEUE_HOST", "localhost")
QUEUE_USER = os.environ.get("QUEUE_USER", "guest")
QUEUE_PASSWORD = os.environ.get("QUEUE_PASSWORD", "guest")
FAILURE_QUEUE_NAME = os.environ.get("FAILURE_QUEUE_NAME", "failed-watched-files")


def clean_up_pvcs_for_job(job: V1Job, namespace: str) -> None:
Expand Down Expand Up @@ -185,9 +190,35 @@ def check_for_changes(self) -> None:
self.done_watching = True
elif self.check_for_pod_stalled():
logger.info("Job has stalled out...")
self.resubmit_job()
self.cleanup_job()
self.done_watching = True

def resubmit_job(self) -> None:
"""
Resubmit the job by pushing its filepath onto the failure queue if it exists.
"""
if self.job is None:
raise AttributeError("Job must be set in the JobWatcher before calling this function.")
filepath = self.job.metadata.annotations.get("filepath")
if not filepath:
logger.info("No filepath annotation found, skipping resubmission.")
return

logger.info("Resubmitting job via failure queue with filepath: %s", filepath)
credentials = pika.PlainCredentials(username=QUEUE_USER, password=QUEUE_PASSWORD)
connection_parameters = pika.ConnectionParameters(QUEUE_HOST, 5672, credentials=credentials)
try:
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
channel.queue_declare(FAILURE_QUEUE_NAME, durable=True, arguments={"x-queue-type": "quorum"})
channel.basic_publish(exchange="", routing_key=FAILURE_QUEUE_NAME, body=filepath.encode())
connection.close()
logger.info("Successfully published filepath to failure queue")
except Exception as exception:
logger.error("Failed to publish to failure queue: %s", str(exception))
logger.exception(exception)

def get_container_status(self) -> V1ContainerStatus | None:
"""
Get and return the current container status, ignoring the job watcher's container
Expand Down Expand Up @@ -225,29 +256,29 @@ def check_for_job_complete(self) -> bool:
def check_for_pod_stalled(self) -> bool:
"""
The way this checks if a job is stalled is by checking if there has been no new
logs for the last 30 minutes, or if the job has taken over 6 hours to complete.
logs for the last 60 minutes, or if the job has taken over 6 hours to complete.
Long term 6 hours may be too little so this is configurable using the
environment variables.
:return: bool, True if pod is stalled, False if pod is not stalled.
"""
if self.pod is None:
raise AttributeError("Pod must be set in the JobWatcher before calling this function.")
v1_core = client.CoreV1Api()
seconds_in_30_minutes = 60 * 30
# If pod is younger than 30 minutes it can't be stalled for 30 minutes, if older, then check.
seconds_in_60_minutes = 60 * 60
# If pod is younger than 60 minutes it can't be stalled for 60 minutes, if older, then check.
if (datetime.datetime.now(datetime.UTC) - self.pod.metadata.creation_timestamp) > datetime.timedelta(
seconds=seconds_in_30_minutes
seconds=seconds_in_60_minutes
):
logs = v1_core.read_namespaced_pod_log(
name=self.pod.metadata.name,
namespace=self.pod.metadata.namespace,
timestamps=True,
tail_lines=1,
since_seconds=seconds_in_30_minutes,
since_seconds=seconds_in_60_minutes,
container=self.container_name,
)
if logs == "":
logger.info("No new logs for pod %s in %s seconds", self.pod.metadata.name, seconds_in_30_minutes)
logger.info("No new logs for pod %s in %s seconds", self.pod.metadata.name, seconds_in_60_minutes)
return True
if (datetime.datetime.now(datetime.UTC) - self.pod.metadata.creation_timestamp) > datetime.timedelta(
seconds=self.max_time_to_complete
Expand Down
1 change: 1 addition & 0 deletions job_watcher/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ version = "0.0.1"
requires-python = ">= 3.11"
dependencies = [
"kubernetes==35.0.0",
"pika==1.3.2",
]

[project.urls]
Expand Down
Loading
Loading