diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index 17c1c17f8..54c2e2cc6 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -17,7 +17,6 @@ selected_from, ) from frequenz.client.common.microgrid.components import ComponentId -from frequenz.client.microgrid import ApiClientError, MicrogridApiClient from frequenz.client.microgrid.component import EvCharger from frequenz.quantities import Power, Voltage from typing_extensions import override @@ -30,15 +29,16 @@ from ..._component_pool_status_tracker import ComponentPoolStatusTracker from ..._component_status import ComponentPoolStatus, EVChargerStatusTracker from ...request import Request -from ...result import PartialFailure, Result, Success +from ...result import Result from .._component_manager import ComponentManager +from .._set_power_mixin import SetPowerMixin from ._config import EVDistributionConfig from ._states import EvcState, EvcStates _logger = logging.getLogger(__name__) -class EVChargerManager(ComponentManager): +class EVChargerManager(SetPowerMixin, ComponentManager): """Manage ev chargers for the power distributor.""" @override @@ -284,84 +284,14 @@ async def _run(self) -> None: # pylint: disable=too-many-locals latest_target_powers.update(target_power_changes) result = await self._set_api_power( - api, target_power_changes, self._api_power_request_timeout - ) - await self._results_sender.send(result) - - async def _set_api_power( - self, - api: MicrogridApiClient, - target_power_changes: dict[ComponentId, Power], - api_request_timeout: timedelta, - ) -> Result: - """Send the EV charger power changes to the microgrid API. - - Args: - api: The microgrid API client to use for setting the power. - target_power_changes: A dictionary containing the new power allocations for - the EV chargers. - api_request_timeout: The timeout for the API request. - - Returns: - Power distribution result, corresponding to the result of the API - request. - """ - tasks: dict[ComponentId, asyncio.Task[datetime | None]] = {} - for component_id, power in target_power_changes.items(): - tasks[component_id] = asyncio.create_task( - api.set_component_power_active(component_id, power.as_watts()) - ) - _, pending = await asyncio.wait( - tasks.values(), - timeout=api_request_timeout.total_seconds(), - return_when=asyncio.ALL_COMPLETED, - ) - for task in pending: - task.cancel() - await asyncio.gather(*pending, return_exceptions=True) - - failed_components: set[ComponentId] = set() - succeeded_components: set[ComponentId] = set() - failed_power = Power.zero() - for component_id, task in tasks.items(): - try: - task.result() - except asyncio.CancelledError: - _logger.warning( - "Timeout while setting power to EV charger %s", component_id - ) - except ApiClientError as exc: - _logger.warning( - "Got a client error while setting power to EV charger %s: %s", - component_id, - exc, - ) - except Exception: # pylint: disable=broad-except - _logger.exception( - "Unknown error while setting power to EV charger: %s", component_id - ) - else: - succeeded_components.add(component_id) - continue - - failed_components.add(component_id) - failed_power += target_power_changes[component_id] - - if failed_components: - return PartialFailure( - failed_components=failed_components, - succeeded_components=succeeded_components, - failed_power=failed_power, - succeeded_power=self._target_power - failed_power, - excess_power=Power.zero(), request=self._latest_request, + target_power=self._target_power, + allocations=target_power_changes, + api_request_timeout=self._api_power_request_timeout, + remaining_power=Power.zero(), + component_category="EV charger", ) - return Success( - succeeded_components=succeeded_components, - succeeded_power=self._target_power, - excess_power=Power.zero(), - request=self._latest_request, - ) + await self._results_sender.send(result) def _deallocate_unused_power( self, to_deallocate: Power diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py index 4a12ab011..7fc1f9745 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_pv_inverter_manager/_pv_inverter_manager.py @@ -6,11 +6,10 @@ import asyncio import collections.abc import logging -from datetime import datetime, timedelta +from datetime import timedelta from frequenz.channels import LatestValueCache, Sender from frequenz.client.common.microgrid.components import ComponentId -from frequenz.client.microgrid import ApiClientError from frequenz.client.microgrid.component import SolarInverter from frequenz.quantities import Power from typing_extensions import override @@ -21,13 +20,14 @@ from ..._component_pool_status_tracker import ComponentPoolStatusTracker from ..._component_status import ComponentPoolStatus, PVInverterStatusTracker from ...request import Request -from ...result import PartialFailure, Result, Success +from ...result import Result, Success from .._component_manager import ComponentManager +from .._set_power_mixin import SetPowerMixin _logger = logging.getLogger(__name__) -class PVManager(ComponentManager): +class PVManager(SetPowerMixin, ComponentManager): """Manage PV inverters for the power distributor.""" @override @@ -176,79 +176,15 @@ async def distribute_power(self, request: Request) -> None: request.power, allocations, ) - await self._set_api_power(request, allocations, remaining_power) - - async def _set_api_power( # pylint: disable=too-many-locals - self, - request: Request, - allocations: dict[ComponentId, Power], - remaining_power: Power, - ) -> None: - api_client = connection_manager.get().api_client - tasks: dict[ComponentId, asyncio.Task[datetime | None]] = {} - for component_id, power in allocations.items(): - tasks[component_id] = asyncio.create_task( - api_client.set_component_power_active(component_id, power.as_watts()) - ) - _, pending = await asyncio.wait( - tasks.values(), - timeout=self._api_power_request_timeout.total_seconds(), - return_when=asyncio.ALL_COMPLETED, - ) - # collect the timed out tasks and cancel them while keeping the - # exceptions, so that they can be processed later. - for task in pending: - task.cancel() - await asyncio.gather(*pending, return_exceptions=True) - - failed_components: set[ComponentId] = set() - succeeded_components: set[ComponentId] = set() - failed_power = Power.zero() - for component_id, task in tasks.items(): - try: - task.result() - except asyncio.CancelledError: - _logger.warning( - "Timeout while setting power to PV inverter %s", component_id - ) - except ApiClientError as exc: - _logger.warning( - "Got a client error while setting power to PV inverter %s: %s", - component_id, - exc, - ) - except Exception: # pylint: disable=broad-except - _logger.exception( - "Unknown error while setting power to PV inverter: %s", - component_id, - ) - else: - succeeded_components.add(component_id) - continue - - failed_components.add(component_id) - failed_power += allocations[component_id] - - if failed_components: - await self._results_sender.send( - PartialFailure( - failed_components=failed_components, - succeeded_components=succeeded_components, - failed_power=failed_power, - succeeded_power=request.power - failed_power - remaining_power, - excess_power=remaining_power, - request=request, - ) - ) - return - await self._results_sender.send( - Success( - succeeded_components=succeeded_components, - succeeded_power=request.power - remaining_power, - excess_power=remaining_power, - request=request, - ) + result = await self._set_api_power( + request=request, + target_power=request.power, + allocations=allocations, + api_request_timeout=self._api_power_request_timeout, + remaining_power=remaining_power, + component_category="PV inverter", ) + await self._results_sender.send(result) def _get_pv_inverter_ids(self) -> collections.abc.Set[ComponentId]: """Return the IDs of all PV inverters present in the component graph.""" diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_set_power_mixin.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_set_power_mixin.py new file mode 100644 index 000000000..f774d6995 --- /dev/null +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_set_power_mixin.py @@ -0,0 +1,113 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Mixin for setting component powers via microgrid API.""" + +import asyncio +import logging +from datetime import datetime, timedelta + +from frequenz.client.base.exception import ApiClientError +from frequenz.client.common.microgrid.components import ComponentId +from frequenz.quantities import Power + +from frequenz.sdk.microgrid import connection_manager + +from ..request import Request +from ..result import PartialFailure, Result, Success + +_logger = logging.getLogger(__name__) + + +class SetPowerMixin: + """Mixin for setting component powers via microgrid API.""" + + @staticmethod + async def _set_api_power( # pylint: disable=too-many-locals,too-many-arguments + *, + request: Request, + target_power: Power, + allocations: dict[ComponentId, Power], + api_request_timeout: timedelta, + remaining_power: Power, + component_category: str, + ) -> Result: + """Send the component power changes to the microgrid API. + + Args: + request: Set-power request sent to the `PowerDistributingActor`. + target_power: The requested power. + allocations: A dictionary containing the new power allocations for + each component. + api_request_timeout: The timeout for the API request. + remaining_power: Any excess (remaining) power. + component_category: Component category name, for display purposes. + + Returns: + Power distribution result, corresponding to the result of the API + request. + """ + api_client = connection_manager.get().api_client + tasks: dict[ComponentId, asyncio.Task[datetime | None]] = {} + for component_id, power in allocations.items(): + tasks[component_id] = asyncio.create_task( + api_client.set_component_power_active(component_id, power.as_watts()) + ) + _, pending = await asyncio.wait( + tasks.values(), + timeout=api_request_timeout.total_seconds(), + return_when=asyncio.ALL_COMPLETED, + ) + # collect the timed out tasks and cancel them while keeping the + # exceptions, so that they can be processed later. + for task in pending: + task.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + failed_components: set[ComponentId] = set() + succeeded_components: set[ComponentId] = set() + failed_power = Power.zero() + for component_id, task in tasks.items(): + try: + task.result() + except asyncio.CancelledError: + _logger.warning( + "Timeout while setting power to %s %s", + component_category, + component_id, + ) + except ApiClientError as exc: + _logger.warning( + "Got a client error while setting power to %s %s: %s", + component_category, + component_id, + exc, + ) + except Exception: # pylint: disable=broad-except + _logger.exception( + "Unknown error while setting power to %s: %s", + component_category, + component_id, + ) + else: + succeeded_components.add(component_id) + continue + + failed_components.add(component_id) + failed_power += allocations[component_id] + + if failed_components: + return PartialFailure( + failed_components=failed_components, + succeeded_components=succeeded_components, + failed_power=failed_power, + succeeded_power=target_power - failed_power - remaining_power, + excess_power=remaining_power, + request=request, + ) + return Success( + succeeded_components=succeeded_components, + succeeded_power=target_power - remaining_power, + excess_power=remaining_power, + request=request, + )