Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/powerapi/processor/pre/k8s/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@
from .handlers import K8sPreProcessorActorHWPCReportHandler
from .handlers import K8sPreProcessorActorStartMessageHandler, K8sPreProcessorActorPoisonPillMessageHandler
from .metadata_cache_manager import K8sMetadataCacheManager
from .monitor_agent import K8sMonitorAgent

Check failure

Code scanning / CodeQL

Module-level cyclic import Error

'K8sMonitorAgent' may not be defined if module
powerapi.processor.pre.k8s.monitor_agent
is imported before module
powerapi.processor.pre.k8s.actor
, as the
definition
of K8sMonitorAgent occurs after the cyclic
import
of powerapi.processor.pre.k8s.actor.


@dataclass
@dataclass(frozen=True)
class K8sProcessorConfig:
"""
Kubernetes processor actor configuration.
:param api_mode: Kubernetes API mode (manual, local, cluster)
:param api_host: Kubernetes API host to connect to
:param api_key: Kubernetes API key (Bearer Token) to authenticate with
"""
api_mode: str | None = None
api_mode: str
api_host: str | None = None
api_key: str | None = None

Expand All @@ -67,7 +67,7 @@

self.manager = Manager()
self.metadata_cache_manager = K8sMetadataCacheManager(self.manager)
self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, config.api_mode, config.api_host, config.api_key)
self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, config)


class K8sPreProcessorActor(ProcessorActor):
Expand Down
134 changes: 66 additions & 68 deletions src/powerapi/processor/pre/k8s/monitor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,117 +27,114 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import logging
import sys
from multiprocessing import Process
from multiprocessing import Process, Event
from signal import signal, SIGTERM, SIGINT
from time import sleep
from typing import TYPE_CHECKING

from kubernetes import client, config, watch
from kubernetes.client import V1Pod, V1PodList, V1ContainerStatus
from kubernetes.client.configuration import Configuration
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ProtocolError

from .metadata_cache_manager import ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT
from .metadata_cache_manager import K8sMetadataCacheManager, K8sContainerMetadata

LOCAL_CONFIG_MODE = "local"
MANUAL_CONFIG_MODE = "manual"
CLUSTER_CONFIG_MODE = "cluster"

from .metadata_cache_manager import K8sContainerMetadata, ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT

def _setup_k8s_client_with_local_config() -> None:
"""
Setup Kubernetes API client with a kube-config file. (from KUBECONFIG environment variable, or ~/.kube/config)
"""
config.load_kube_config()
if TYPE_CHECKING:
from .actor import K8sProcessorConfig

Check failure

Code scanning / CodeQL

Module-level cyclic import Error

'K8sProcessorConfig' may not be defined if module
powerapi.processor.pre.k8s.actor
is imported before module
powerapi.processor.pre.k8s.monitor_agent
, as the
definition
of K8sProcessorConfig occurs after the cyclic
import
of powerapi.processor.pre.k8s.monitor_agent.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
from .metadata_cache_manager import K8sMetadataCacheManager


def _setup_k8s_client_with_cluster_config() -> None:
"""
Setup Kubernetes API client with the pod service account. (requires PowerAPI to be running in a Kubernetes cluster)
"""
config.load_incluster_config()
K8S_MONITOR_RETRY_DELAY_SECONDS = 1.0


def _setup_k8s_client_with_manual_config(host: str, api_key: str) -> None:
def load_manual_k8s_config(configuration: client.Configuration, api_host: str | None, api_key: str | None) -> None:
"""
Setup Kubernetes API client with the user provided configuration. (Bearer Token)
:param host: Kubernetes API host url.
:param api_key: Kubernetes API token.
Setup Kubernetes API client configuration manually.
This method only supports authentication by Bearer Token.
:param configuration: Kubernetes API client configuration
:param api_host: The Kubernetes API host
:param api_key: The Kubernetes API key (Bearer Token)
"""
configuration = client.Configuration()
if not api_host:
raise ValueError('Kubernetes API host is not defined')

configuration.host = host or 'http://localhost'
configuration.api_key["authorization"] = api_key
if not api_key:
raise ValueError('Kubernetes API key is not defined')

Configuration.set_default(configuration)
configuration.host = api_host
configuration.api_key['authorization'] = api_key
configuration.api_key_prefix['authorization'] = 'Bearer'


def load_k8s_api_client_configuration(api_mode: str, api_host: str, api_key: str) -> None:
def build_k8s_api_client_configuration(api_mode: str, api_host: str | None, api_key: str | None) -> client.Configuration:
"""
Setup Kubernetes API client according to the selected mode.
:param api_mode: API mode (manual, local, cluster)
:param api_host: API host to connect to
:param api_key: API key (Bearer Token) to authenticate with
Build a Kubernetes API client configuration.
:param api_mode: The Kubernetes API mode (manual, local, cluster)
:param api_host: The Kubernetes API host
:param api_key: The Kubernetes API key (Bearer Token)
:return: Kubernetes API client configuration
"""
if api_mode.casefold() == MANUAL_CONFIG_MODE:
_setup_k8s_client_with_manual_config(api_host, api_key)
return

if api_mode.casefold() == CLUSTER_CONFIG_MODE:
_setup_k8s_client_with_cluster_config()
return
configuration = client.Configuration()
match api_mode.casefold():
case 'local':
# Setup Kubernetes API client with a kube-config file. (from KUBECONFIG environment variable, or ~/.kube/config)
config.load_kube_config(client_configuration=configuration)
case 'cluster':
# Setup Kubernetes API client with the pod service account. (requires PowerAPI to be running in a pod)
config.load_incluster_config(client_configuration=configuration)
case 'manual':
load_manual_k8s_config(configuration, api_host, api_key)
case _:
raise ValueError(f'Invalid Kubernetes API mode: {api_mode}')

# load local configuration by default.
_setup_k8s_client_with_local_config()
return configuration


class K8sMonitorAgent(Process):
"""
Background monitoring agent that update the shared metadata cache from Kubernetes API events.
"""

def __init__(self, cache_manager: K8sMetadataCacheManager, api_mode: str, api_host: str, api_key: str, level_logger: int = logging.WARNING):
def __init__(self, cache_manager: K8sMetadataCacheManager, conf: K8sProcessorConfig, level_logger: int = logging.WARNING):
"""
:param K8sMetadataCacheManager cache_manager: Metadata cache manager
:param str api_mode: The Kubernetes API mode (manual, local, cluster)
:param str api_host: The Kubernetes API host
:param str api_key: The Kubernetes API key (Bearer Token)
:param conf: Configuration of the k8s processor actor
:param int level_logger: The logger level
"""
super().__init__(name='k8s-processor-monitor-agent')

#: (logging.Logger): Logger
self.logger = logging.getLogger(self.name)
self.logger.setLevel(level_logger)
formatter = logging.Formatter('%(asctime)s || %(levelname)s || ' + '%(process)d %(processName)s || %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(formatter)

self.metadata_cache_manager = cache_manager
self.processor_config = conf

self.k8s_api = self._setup_k8s_api_client(api_mode, api_host, api_key)

self.stop_monitoring = False
self._api_config = build_k8s_api_client_configuration(conf.api_mode, conf.api_host, conf.api_key)
self._stop_monitoring = Event()

@staticmethod
def _setup_k8s_api_client(api_mode: str, api_host: str, api_key: str) -> client.CoreV1Api:
def build_k8s_api_client(api_config: client.Configuration) -> client.CoreV1Api:
"""
Setup Kubernetes API client.
:param api_mode: The Kubernetes API mode (manual, local, cluster)
:param api_host: The Kubernetes API host
:param api_key: The Kubernetes API key (Bearer Token)
Build a Kubernetes API client with the given configuration.
:param api_config: Kubernetes API configuration
:return: Kubernetes API client
"""
load_k8s_api_client_configuration(api_mode, api_host, api_key)
return client.CoreV1Api()
api_client = client.ApiClient(configuration=api_config)
return client.CoreV1Api(api_client)

def _setup_signal_handlers(self):
"""
Setup signal handlers for the current Process.
"""
def stop_monitor(_, __):
self.stop_monitoring = True
self._stop_monitoring.set()
sys.exit(0)

signal(SIGTERM, stop_monitor)
Expand All @@ -149,13 +146,13 @@
"""
self._setup_signal_handlers()

# Clearing the metadata cache before starting prevents having orphaned entries
# that will never be deleted because they no longer exist in the Kubernetes API.
self.metadata_cache_manager.clear_metadata_cache()
self.metadata_cache_manager.clear_metadata_cache() # Prevents orphaned cache entries.

while not self.stop_monitoring:
resource_id = self.fetch_list_all_pod_for_all_namespaces()
self.watch_list_pod_for_all_namespaces(resource_id)
api_client = self.build_k8s_api_client(self._api_config)
while not self._stop_monitoring.is_set():
resource_id = self.fetch_list_all_pod_for_all_namespaces(api_client)
self.watch_list_pod_for_all_namespaces(api_client, resource_id)
sleep(K8S_MONITOR_RETRY_DELAY_SECONDS)

@staticmethod
def get_containers_id_name_from_statuses(container_statuses: list[V1ContainerStatus]) -> dict[str, str]:
Expand Down Expand Up @@ -184,14 +181,15 @@
for container_id, container_name in self.get_containers_id_name_from_statuses(container_statuses).items()
]

def fetch_list_all_pod_for_all_namespaces(self) -> int | None:
def fetch_list_all_pod_for_all_namespaces(self, api_client: client.CoreV1Api) -> int | None:
"""
Fetch all pod for all namespaces and populate the metadata cache.
:param api_client: Kubernetes api client
:return: Resource version of the last fetched entry
"""
resource_version = None
try:
pods: V1PodList = self.k8s_api.list_pod_for_all_namespaces(watch=False)
pods: V1PodList = api_client.list_pod_for_all_namespaces(watch=False)
resource_version = pods.metadata.resource_version
for pod in pods.items:
for entry in self.build_metadata_cache_entries_from_pod(pod):
Expand All @@ -204,16 +202,16 @@

return resource_version

def watch_list_pod_for_all_namespaces(self, resource_version: int | None = None):
def watch_list_pod_for_all_namespaces(self, api_client: client.CoreV1Api, resource_version: int | None = None):
"""
Watch k8s pods events for all namespaces and update the local metadata cache accordingly.
:param api_client: Kubernetes API client
:param resource_version: Resource version from where the watcher begin
"""
try:
w = watch.Watch()
for event in w.stream(self.k8s_api.list_pod_for_all_namespaces, resource_version=resource_version):
for event in w.stream(api_client.list_pod_for_all_namespaces, resource_version=resource_version):
event_type = event["type"]

if event_type not in {ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT}:
logging.warning('Unexpected pod event: %s', event_type)
continue
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/processor/pre/k8s/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def _create_handler() -> tuple[K8sPreProcessorActorHWPCReportHandler, list[HWPCR
actor = Mock(name='processor-actor')
actor.target_actors = [Mock(name='target_actor_a'), Mock(name='target_actor_b')]

state = K8sProcessorState(actor, K8sProcessorConfig('manual'))
state = K8sProcessorState(actor, K8sProcessorConfig('manual', 'https://localhost:6443', 'pytest-token'))
state.metadata_cache_manager = Mock(name='metadata_cache_manager')

handler = K8sPreProcessorActorHWPCReportHandler(state)
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/processor/pre/k8s/test_monitor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from kubernetes.client import V1Pod, V1ContainerStatus, Configuration, V1ObjectMeta, V1PodStatus

from powerapi.processor.pre.k8s.actor import K8sProcessorConfig
from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorAgent


Expand All @@ -41,7 +42,8 @@ def initialized_monitor_agent(initialized_metadata_cache_manager):
"""
Returns an initialized monitor agent.
"""
agent = K8sMonitorAgent(initialized_metadata_cache_manager, 'manual', '', '')
processor_config = K8sProcessorConfig('manual', 'https://localhost:6443', 'pytest-token')
agent = K8sMonitorAgent(initialized_metadata_cache_manager, processor_config)
return agent


Expand Down Expand Up @@ -109,7 +111,6 @@ def test_building_metadata_cache_entry_from_pod(initialized_monitor_agent):
"""
Test building metadata cache entries from a Kubernetes POD object.
"""

pod_name = 'test-pod'
pod_namespace = 'powerapi'
pod_labels = {'executor': 'pytest'}
Expand Down