diff --git a/src/powerapi/cli/generator.py b/src/powerapi/cli/generator.py index a58b2877..27c87457 100644 --- a/src/powerapi/cli/generator.py +++ b/src/powerapi/cli/generator.py @@ -444,14 +444,17 @@ def _k8s_pre_processor_factory(processor_config: dict) -> ProcessorActor: :param processor_config: Pre-Processor configuration :return: Configured Kubernetes pre-processor actor """ - from powerapi.processor.pre.k8s.actor import K8sPreProcessorActor, K8sProcessorConfig - name = processor_config[ACTOR_NAME_KEY] + from powerapi.processor.pre.k8s.actor import K8sPreProcessorActor + from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorConfig + api_mode = processor_config[K8S_API_MODE_KEY] api_host = processor_config.get(K8S_API_HOST_KEY, None) api_key = processor_config.get(K8S_API_KEY_KEY, None) + monitor_config = K8sMonitorConfig(api_mode, api_host, api_key) + + name = processor_config[ACTOR_NAME_KEY] level_logger = logging.DEBUG if processor_config[GENERAL_CONF_VERBOSE_KEY] else logging.INFO - config = K8sProcessorConfig(api_mode, api_host, api_key) - return K8sPreProcessorActor(name, config, level_logger) + return K8sPreProcessorActor(name, monitor_config, level_logger) @staticmethod def _openstack_pre_processor_factory(processor_config: dict) -> ProcessorActor: diff --git a/src/powerapi/processor/pre/k8s/actor.py b/src/powerapi/processor/pre/k8s/actor.py index 8bdf6355..4a866496 100644 --- a/src/powerapi/processor/pre/k8s/actor.py +++ b/src/powerapi/processor/pre/k8s/actor.py @@ -28,7 +28,6 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging -from dataclasses import dataclass from multiprocessing import Manager from powerapi.actor import Actor, State @@ -38,20 +37,7 @@ from .handlers import K8sPreProcessorActorHWPCReportHandler from .handlers import K8sPreProcessorActorStartMessageHandler, K8sPreProcessorActorPoisonPillMessageHandler from .metadata_cache_manager import K8sMetadataCacheManager -from .monitor_agent import K8sMonitorAgent - - -@dataclass -class K8sProcessorConfig: - """ - Kubernetes processor actor configuration. - :param api_mode: Kubernetes API mode (manual, local, cluster) - :param api_host: Kubernetes API host to connect to - :param api_key: Kubernetes API key (Bearer Token) to authenticate with - """ - api_mode: str | None = None - api_host: str | None = None - api_key: str | None = None +from .monitor_agent import K8sMonitorAgent, K8sMonitorConfig class K8sProcessorState(State): @@ -59,7 +45,7 @@ class K8sProcessorState(State): State of the Kubernetes processor actor. """ - def __init__(self, actor: Actor, config: K8sProcessorConfig): + def __init__(self, actor: Actor, monitor_config: K8sMonitorConfig): """ Initializes a Kubernetes pre-processor state. """ @@ -67,7 +53,7 @@ def __init__(self, actor: Actor, config: K8sProcessorConfig): self.manager = Manager() self.metadata_cache_manager = K8sMetadataCacheManager(self.manager) - self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, config.api_mode, config.api_host, config.api_key) + self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, monitor_config) class K8sPreProcessorActor(ProcessorActor): @@ -75,23 +61,23 @@ class K8sPreProcessorActor(ProcessorActor): Pre-Processor Actor that adds Kubernetes related metadata to reports. """ - def __init__(self, name: str, config: K8sProcessorConfig, level_logger: int = logging.WARNING, timeout: int = 5000): + def __init__(self, name: str, monitor_config: K8sMonitorConfig, level_logger: int = logging.WARNING, timeout: int = 5000): """ Initializes a Kubernetes pre-processor actor. :param name: The name of the actor - :param config: Configuration of the actor + :param monitor_config: Configuration of the monitoring agent :param level_logger: logging level of the actor :param timeout: timeout in seconds """ super().__init__(name, level_logger, timeout) - self.config = config + self.monitor_config = monitor_config def setup(self): """ Set up the Kubernetes pre-processor actor. """ - self.state = K8sProcessorState(self, self.config) + self.state = K8sProcessorState(self, self.monitor_config) self.add_handler(StartMessage, K8sPreProcessorActorStartMessageHandler(self.state)) self.add_handler(HWPCReport, K8sPreProcessorActorHWPCReportHandler(self.state)) diff --git a/src/powerapi/processor/pre/k8s/metadata_cache_manager.py b/src/powerapi/processor/pre/k8s/metadata_cache_manager.py index 5fb5be38..600b1c6a 100644 --- a/src/powerapi/processor/pre/k8s/metadata_cache_manager.py +++ b/src/powerapi/processor/pre/k8s/metadata_cache_manager.py @@ -27,8 +27,9 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from collections.abc import MutableMapping from dataclasses import dataclass -from multiprocessing import Manager +from multiprocessing.managers import SyncManager ADDED_EVENT = 'ADDED' DELETED_EVENT = 'DELETED' @@ -52,11 +53,11 @@ class K8sMetadataCacheManager: Kubernetes container metadata cache manager. """ - def __init__(self, manager: Manager): + def __init__(self, manager: SyncManager): """ :param manager: Manager of the shared metadata cache """ - self.metadata_cache: dict[str, K8sContainerMetadata] = manager.dict() + self.metadata_cache: MutableMapping[str, K8sContainerMetadata] = manager.dict() def update_container_metadata(self, event: str, container_metadata: K8sContainerMetadata): """ diff --git a/src/powerapi/processor/pre/k8s/monitor_agent.py b/src/powerapi/processor/pre/k8s/monitor_agent.py index ed9ca304..fbeadd91 100644 --- a/src/powerapi/processor/pre/k8s/monitor_agent.py +++ b/src/powerapi/processor/pre/k8s/monitor_agent.py @@ -29,68 +29,75 @@ import logging import sys -from multiprocessing import Process +from dataclasses import dataclass +from multiprocessing import Process, Event from signal import signal, SIGTERM, SIGINT +from time import sleep from kubernetes import client, config, watch from kubernetes.client import V1Pod, V1PodList, V1ContainerStatus -from kubernetes.client.configuration import Configuration from kubernetes.client.rest import ApiException from urllib3.exceptions import ProtocolError -from .metadata_cache_manager import ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT -from .metadata_cache_manager import K8sMetadataCacheManager, K8sContainerMetadata +from .metadata_cache_manager import K8sMetadataCacheManager, K8sContainerMetadata, ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT -LOCAL_CONFIG_MODE = "local" -MANUAL_CONFIG_MODE = "manual" -CLUSTER_CONFIG_MODE = "cluster" +K8S_MONITOR_RETRY_DELAY_SECONDS = 1.0 -def _setup_k8s_client_with_local_config() -> None: +@dataclass(frozen=True) +class K8sMonitorConfig: """ - Setup Kubernetes API client with a kube-config file. (from KUBECONFIG environment variable, or ~/.kube/config) + Kubernetes monitoring agent configuration. + :param api_mode: Kubernetes API mode (manual, local, cluster) + :param api_host: Kubernetes API host to connect to + :param api_key: Kubernetes API key (Bearer Token) to authenticate with """ - config.load_kube_config() + api_mode: str + api_host: str | None = None + api_key: str | None = None -def _setup_k8s_client_with_cluster_config() -> None: +def load_manual_k8s_config(configuration: client.Configuration, api_host: str | None, api_key: str | None) -> None: """ - Setup Kubernetes API client with the pod service account. (requires PowerAPI to be running in a Kubernetes cluster) + Setup Kubernetes API client configuration manually. + This method only supports authentication by Bearer Token. + :param configuration: Kubernetes API client configuration + :param api_host: The Kubernetes API host + :param api_key: The Kubernetes API key (Bearer Token) """ - config.load_incluster_config() + if not api_host: + raise ValueError('Kubernetes API host is not defined') + if not api_key: + raise ValueError('Kubernetes API key is not defined') -def _setup_k8s_client_with_manual_config(host: str, api_key: str) -> None: - """ - Setup Kubernetes API client with the user provided configuration. (Bearer Token) - :param host: Kubernetes API host url. - :param api_key: Kubernetes API token. - """ - configuration = client.Configuration() - - configuration.host = host or 'http://localhost' - configuration.api_key["authorization"] = api_key - - Configuration.set_default(configuration) + configuration.host = api_host + configuration.api_key['authorization'] = api_key + configuration.api_key_prefix['authorization'] = 'Bearer' -def load_k8s_api_client_configuration(api_mode: str, api_host: str, api_key: str) -> None: +def build_k8s_api_client_configuration(api_mode: str, api_host: str | None, api_key: str | None) -> client.Configuration: """ - Setup Kubernetes API client according to the selected mode. - :param api_mode: API mode (manual, local, cluster) - :param api_host: API host to connect to - :param api_key: API key (Bearer Token) to authenticate with + Build a Kubernetes API client configuration. + :param api_mode: The Kubernetes API mode (manual, local, cluster) + :param api_host: The Kubernetes API host + :param api_key: The Kubernetes API key (Bearer Token) + :return: Kubernetes API client configuration """ - if api_mode.casefold() == MANUAL_CONFIG_MODE: - _setup_k8s_client_with_manual_config(api_host, api_key) - return - - if api_mode.casefold() == CLUSTER_CONFIG_MODE: - _setup_k8s_client_with_cluster_config() - return + configuration = client.Configuration() + match api_mode.casefold(): + case 'local': + # Setup Kubernetes API client with a kube-config file. (from KUBECONFIG environment variable, or ~/.kube/config) + config.load_kube_config(client_configuration=configuration) + case 'cluster': + # Setup Kubernetes API client with the pod service account. (requires PowerAPI to be running in a pod) + config.load_incluster_config(client_configuration=configuration) + case 'manual': + load_manual_k8s_config(configuration, api_host, api_key) + case _: + raise ValueError(f'Invalid Kubernetes API mode: {api_mode}') - # load local configuration by default. - _setup_k8s_client_with_local_config() + return configuration class K8sMonitorAgent(Process): @@ -98,17 +105,14 @@ class K8sMonitorAgent(Process): Background monitoring agent that update the shared metadata cache from Kubernetes API events. """ - def __init__(self, cache_manager: K8sMetadataCacheManager, api_mode: str, api_host: str, api_key: str, level_logger: int = logging.WARNING): + def __init__(self, cache_manager: K8sMetadataCacheManager, conf: K8sMonitorConfig, level_logger: int = logging.WARNING): """ :param K8sMetadataCacheManager cache_manager: Metadata cache manager - :param str api_mode: The Kubernetes API mode (manual, local, cluster) - :param str api_host: The Kubernetes API host - :param str api_key: The Kubernetes API key (Bearer Token) + :param conf: Configuration of the k8s processor actor :param int level_logger: The logger level """ super().__init__(name='k8s-processor-monitor-agent') - #: (logging.Logger): Logger self.logger = logging.getLogger(self.name) self.logger.setLevel(level_logger) formatter = logging.Formatter('%(asctime)s || %(levelname)s || ' + '%(process)d %(processName)s || %(message)s') @@ -117,27 +121,25 @@ def __init__(self, cache_manager: K8sMetadataCacheManager, api_mode: str, api_ho self.metadata_cache_manager = cache_manager - self.k8s_api = self._setup_k8s_api_client(api_mode, api_host, api_key) - - self.stop_monitoring = False + self._api_config = build_k8s_api_client_configuration(conf.api_mode, conf.api_host, conf.api_key) + self._stop_monitoring = Event() @staticmethod - def _setup_k8s_api_client(api_mode: str, api_host: str, api_key: str) -> client.CoreV1Api: + def build_k8s_api_client(api_config: client.Configuration) -> client.CoreV1Api: """ - Setup Kubernetes API client. - :param api_mode: The Kubernetes API mode (manual, local, cluster) - :param api_host: The Kubernetes API host - :param api_key: The Kubernetes API key (Bearer Token) + Build a Kubernetes API client with the given configuration. + :param api_config: Kubernetes API configuration + :return: Kubernetes API client """ - load_k8s_api_client_configuration(api_mode, api_host, api_key) - return client.CoreV1Api() + api_client = client.ApiClient(configuration=api_config) + return client.CoreV1Api(api_client) def _setup_signal_handlers(self): """ Setup signal handlers for the current Process. """ def stop_monitor(_, __): - self.stop_monitoring = True + self._stop_monitoring.set() sys.exit(0) signal(SIGTERM, stop_monitor) @@ -149,13 +151,13 @@ def run(self): """ self._setup_signal_handlers() - # Clearing the metadata cache before starting prevents having orphaned entries - # that will never be deleted because they no longer exist in the Kubernetes API. - self.metadata_cache_manager.clear_metadata_cache() + self.metadata_cache_manager.clear_metadata_cache() # Prevents orphaned cache entries. - while not self.stop_monitoring: - resource_id = self.fetch_list_all_pod_for_all_namespaces() - self.watch_list_pod_for_all_namespaces(resource_id) + api_client = self.build_k8s_api_client(self._api_config) + while not self._stop_monitoring.is_set(): + resource_id = self.fetch_list_all_pod_for_all_namespaces(api_client) + self.watch_list_pod_for_all_namespaces(api_client, resource_id) + sleep(K8S_MONITOR_RETRY_DELAY_SECONDS) @staticmethod def get_containers_id_name_from_statuses(container_statuses: list[V1ContainerStatus]) -> dict[str, str]: @@ -184,14 +186,15 @@ def build_metadata_cache_entries_from_pod(self, pod: V1Pod) -> list[K8sContainer for container_id, container_name in self.get_containers_id_name_from_statuses(container_statuses).items() ] - def fetch_list_all_pod_for_all_namespaces(self) -> int | None: + def fetch_list_all_pod_for_all_namespaces(self, api_client: client.CoreV1Api) -> int | None: """ Fetch all pod for all namespaces and populate the metadata cache. + :param api_client: Kubernetes api client :return: Resource version of the last fetched entry """ resource_version = None try: - pods: V1PodList = self.k8s_api.list_pod_for_all_namespaces(watch=False) + pods: V1PodList = api_client.list_pod_for_all_namespaces(watch=False) resource_version = pods.metadata.resource_version for pod in pods.items: for entry in self.build_metadata_cache_entries_from_pod(pod): @@ -204,16 +207,16 @@ def fetch_list_all_pod_for_all_namespaces(self) -> int | None: return resource_version - def watch_list_pod_for_all_namespaces(self, resource_version: int | None = None): + def watch_list_pod_for_all_namespaces(self, api_client: client.CoreV1Api, resource_version: int | None = None): """ Watch k8s pods events for all namespaces and update the local metadata cache accordingly. + :param api_client: Kubernetes API client :param resource_version: Resource version from where the watcher begin """ try: w = watch.Watch() - for event in w.stream(self.k8s_api.list_pod_for_all_namespaces, resource_version=resource_version): + for event in w.stream(api_client.list_pod_for_all_namespaces, resource_version=resource_version): event_type = event["type"] - if event_type not in {ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT}: logging.warning('Unexpected pod event: %s', event_type) continue diff --git a/tests/unit/cli/test_generator_k8s.py b/tests/unit/cli/test_generator_k8s.py index ab86a98c..d4694e99 100644 --- a/tests/unit/cli/test_generator_k8s.py +++ b/tests/unit/cli/test_generator_k8s.py @@ -69,9 +69,9 @@ def test_preprocessor_generator_with_valid_k8s_config(k8s_processor_config): assert isinstance(preprocessor, K8sPreProcessorActor) expected_preprocessor_attributes = k8s_processor_config['pre-processor']['pytest-k8s-preprocessor'] - assert preprocessor.config.api_mode == expected_preprocessor_attributes['api-mode'] - assert preprocessor.config.api_key == expected_preprocessor_attributes['api-key'] - assert preprocessor.config.api_host == expected_preprocessor_attributes['api-host'] + assert preprocessor.monitor_config.api_mode == expected_preprocessor_attributes['api-mode'] + assert preprocessor.monitor_config.api_key == expected_preprocessor_attributes['api-key'] + assert preprocessor.monitor_config.api_host == expected_preprocessor_attributes['api-host'] @pytest.mark.parametrize('missing_arg', ['api-mode']) diff --git a/tests/unit/processor/pre/k8s/test_handlers.py b/tests/unit/processor/pre/k8s/test_handlers.py index 2bb9a04a..c5e60d97 100644 --- a/tests/unit/processor/pre/k8s/test_handlers.py +++ b/tests/unit/processor/pre/k8s/test_handlers.py @@ -34,7 +34,8 @@ pytest.importorskip('powerapi.processor.pre.k8s.actor') # The actor module requires external dependencies. -from powerapi.processor.pre.k8s.actor import K8sProcessorState, K8sProcessorConfig +from powerapi.processor.pre.k8s.actor import K8sProcessorState +from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorConfig from powerapi.processor.pre.k8s.handlers import K8sPreProcessorActorHWPCReportHandler from powerapi.processor.pre.k8s.metadata_cache_manager import K8sContainerMetadata from powerapi.report import HWPCReport @@ -50,7 +51,8 @@ def _create_handler() -> tuple[K8sPreProcessorActorHWPCReportHandler, list[HWPCR actor = Mock(name='processor-actor') actor.target_actors = [Mock(name='target_actor_a'), Mock(name='target_actor_b')] - state = K8sProcessorState(actor, K8sProcessorConfig('manual')) + monitor_config = K8sMonitorConfig('manual', 'https://localhost:6443', 'pytest-token') + state = K8sProcessorState(actor, monitor_config) state.metadata_cache_manager = Mock(name='metadata_cache_manager') handler = K8sPreProcessorActorHWPCReportHandler(state) diff --git a/tests/unit/processor/pre/k8s/test_monitor_agent.py b/tests/unit/processor/pre/k8s/test_monitor_agent.py index 78ffff6c..8eb5c732 100644 --- a/tests/unit/processor/pre/k8s/test_monitor_agent.py +++ b/tests/unit/processor/pre/k8s/test_monitor_agent.py @@ -33,7 +33,7 @@ from kubernetes.client import V1Pod, V1ContainerStatus, Configuration, V1ObjectMeta, V1PodStatus -from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorAgent +from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorAgent, K8sMonitorConfig @pytest.fixture @@ -41,7 +41,8 @@ def initialized_monitor_agent(initialized_metadata_cache_manager): """ Returns an initialized monitor agent. """ - agent = K8sMonitorAgent(initialized_metadata_cache_manager, 'manual', '', '') + monitor_config = K8sMonitorConfig('manual', 'https://localhost:6443', 'pytest-token') + agent = K8sMonitorAgent(initialized_metadata_cache_manager, monitor_config) return agent @@ -109,7 +110,6 @@ def test_building_metadata_cache_entry_from_pod(initialized_monitor_agent): """ Test building metadata cache entries from a Kubernetes POD object. """ - pod_name = 'test-pod' pod_namespace = 'powerapi' pod_labels = {'executor': 'pytest'}