Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
selected_from,
)
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid import ApiClientError, MicrogridApiClient
from frequenz.client.microgrid.component import EvCharger
from frequenz.quantities import Power, Voltage
from typing_extensions import override
Expand All @@ -30,15 +29,16 @@
from ..._component_pool_status_tracker import ComponentPoolStatusTracker
from ..._component_status import ComponentPoolStatus, EVChargerStatusTracker
from ...request import Request
from ...result import PartialFailure, Result, Success
from ...result import Result
from .._component_manager import ComponentManager
from .._set_power_mixin import SetPowerMixin
from ._config import EVDistributionConfig
from ._states import EvcState, EvcStates

_logger = logging.getLogger(__name__)


class EVChargerManager(ComponentManager):
class EVChargerManager(SetPowerMixin, ComponentManager):
"""Manage ev chargers for the power distributor."""

@override
Expand Down Expand Up @@ -284,84 +284,14 @@ async def _run(self) -> None: # pylint: disable=too-many-locals

latest_target_powers.update(target_power_changes)
result = await self._set_api_power(
api, target_power_changes, self._api_power_request_timeout
)
await self._results_sender.send(result)

async def _set_api_power(
self,
api: MicrogridApiClient,
target_power_changes: dict[ComponentId, Power],
api_request_timeout: timedelta,
) -> Result:
"""Send the EV charger power changes to the microgrid API.

Args:
api: The microgrid API client to use for setting the power.
target_power_changes: A dictionary containing the new power allocations for
the EV chargers.
api_request_timeout: The timeout for the API request.

Returns:
Power distribution result, corresponding to the result of the API
request.
"""
tasks: dict[ComponentId, asyncio.Task[datetime | None]] = {}
for component_id, power in target_power_changes.items():
tasks[component_id] = asyncio.create_task(
api.set_component_power_active(component_id, power.as_watts())
)
_, pending = await asyncio.wait(
tasks.values(),
timeout=api_request_timeout.total_seconds(),
return_when=asyncio.ALL_COMPLETED,
)
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)

failed_components: set[ComponentId] = set()
succeeded_components: set[ComponentId] = set()
failed_power = Power.zero()
for component_id, task in tasks.items():
try:
task.result()
except asyncio.CancelledError:
_logger.warning(
"Timeout while setting power to EV charger %s", component_id
)
except ApiClientError as exc:
_logger.warning(
"Got a client error while setting power to EV charger %s: %s",
component_id,
exc,
)
except Exception: # pylint: disable=broad-except
_logger.exception(
"Unknown error while setting power to EV charger: %s", component_id
)
else:
succeeded_components.add(component_id)
continue

failed_components.add(component_id)
failed_power += target_power_changes[component_id]

if failed_components:
return PartialFailure(
failed_components=failed_components,
succeeded_components=succeeded_components,
failed_power=failed_power,
succeeded_power=self._target_power - failed_power,
excess_power=Power.zero(),
request=self._latest_request,
target_power=self._target_power,
allocations=target_power_changes,
api_request_timeout=self._api_power_request_timeout,
remaining_power=Power.zero(),
component_category="EV charger",
)
return Success(
succeeded_components=succeeded_components,
succeeded_power=self._target_power,
excess_power=Power.zero(),
request=self._latest_request,
)
await self._results_sender.send(result)

def _deallocate_unused_power(
self, to_deallocate: Power
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import asyncio
import collections.abc
import logging
from datetime import datetime, timedelta
from datetime import timedelta

from frequenz.channels import LatestValueCache, Sender
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.client.microgrid import ApiClientError
from frequenz.client.microgrid.component import SolarInverter
from frequenz.quantities import Power
from typing_extensions import override
Expand All @@ -21,13 +20,14 @@
from ..._component_pool_status_tracker import ComponentPoolStatusTracker
from ..._component_status import ComponentPoolStatus, PVInverterStatusTracker
from ...request import Request
from ...result import PartialFailure, Result, Success
from ...result import Result, Success
from .._component_manager import ComponentManager
from .._set_power_mixin import SetPowerMixin

_logger = logging.getLogger(__name__)


class PVManager(ComponentManager):
class PVManager(SetPowerMixin, ComponentManager):
"""Manage PV inverters for the power distributor."""

@override
Expand Down Expand Up @@ -176,79 +176,15 @@ async def distribute_power(self, request: Request) -> None:
request.power,
allocations,
)
await self._set_api_power(request, allocations, remaining_power)

async def _set_api_power( # pylint: disable=too-many-locals
self,
request: Request,
allocations: dict[ComponentId, Power],
remaining_power: Power,
) -> None:
api_client = connection_manager.get().api_client
tasks: dict[ComponentId, asyncio.Task[datetime | None]] = {}
for component_id, power in allocations.items():
tasks[component_id] = asyncio.create_task(
api_client.set_component_power_active(component_id, power.as_watts())
)
_, pending = await asyncio.wait(
tasks.values(),
timeout=self._api_power_request_timeout.total_seconds(),
return_when=asyncio.ALL_COMPLETED,
)
# collect the timed out tasks and cancel them while keeping the
# exceptions, so that they can be processed later.
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)

failed_components: set[ComponentId] = set()
succeeded_components: set[ComponentId] = set()
failed_power = Power.zero()
for component_id, task in tasks.items():
try:
task.result()
except asyncio.CancelledError:
_logger.warning(
"Timeout while setting power to PV inverter %s", component_id
)
except ApiClientError as exc:
_logger.warning(
"Got a client error while setting power to PV inverter %s: %s",
component_id,
exc,
)
except Exception: # pylint: disable=broad-except
_logger.exception(
"Unknown error while setting power to PV inverter: %s",
component_id,
)
else:
succeeded_components.add(component_id)
continue

failed_components.add(component_id)
failed_power += allocations[component_id]

if failed_components:
await self._results_sender.send(
PartialFailure(
failed_components=failed_components,
succeeded_components=succeeded_components,
failed_power=failed_power,
succeeded_power=request.power - failed_power - remaining_power,
excess_power=remaining_power,
request=request,
)
)
return
await self._results_sender.send(
Success(
succeeded_components=succeeded_components,
succeeded_power=request.power - remaining_power,
excess_power=remaining_power,
request=request,
)
result = await self._set_api_power(
request=request,
target_power=request.power,
allocations=allocations,
api_request_timeout=self._api_power_request_timeout,
remaining_power=remaining_power,
component_category="PV inverter",
)
await self._results_sender.send(result)

def _get_pv_inverter_ids(self) -> collections.abc.Set[ComponentId]:
"""Return the IDs of all PV inverters present in the component graph."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# License: MIT
# Copyright © 2026 Frequenz Energy-as-a-Service GmbH

"""Mixin for setting component powers via microgrid API."""

import asyncio
import logging
from datetime import datetime, timedelta

from frequenz.client.base.exception import ApiClientError
from frequenz.client.common.microgrid.components import ComponentId
from frequenz.quantities import Power

from frequenz.sdk.microgrid import connection_manager

from ..request import Request
from ..result import PartialFailure, Result, Success

_logger = logging.getLogger(__name__)


class SetPowerMixin:
"""Mixin for setting component powers via microgrid API."""

@staticmethod
async def _set_api_power( # pylint: disable=too-many-locals,too-many-arguments
*,
request: Request,
target_power: Power,
allocations: dict[ComponentId, Power],
api_request_timeout: timedelta,
remaining_power: Power,
component_category: str,
) -> Result:
"""Send the component power changes to the microgrid API.

Args:
request: Set-power request sent to the `PowerDistributingActor`.
target_power: The requested power.
allocations: A dictionary containing the new power allocations for
each component.
api_request_timeout: The timeout for the API request.
remaining_power: Any excess (remaining) power.
component_category: Component category name, for display purposes.

Returns:
Power distribution result, corresponding to the result of the API
request.
"""
api_client = connection_manager.get().api_client
tasks: dict[ComponentId, asyncio.Task[datetime | None]] = {}
for component_id, power in allocations.items():
tasks[component_id] = asyncio.create_task(
api_client.set_component_power_active(component_id, power.as_watts())
)
_, pending = await asyncio.wait(
tasks.values(),
timeout=api_request_timeout.total_seconds(),
return_when=asyncio.ALL_COMPLETED,
)
# collect the timed out tasks and cancel them while keeping the
# exceptions, so that they can be processed later.
for task in pending:
task.cancel()
await asyncio.gather(*pending, return_exceptions=True)

failed_components: set[ComponentId] = set()
succeeded_components: set[ComponentId] = set()
failed_power = Power.zero()
for component_id, task in tasks.items():
try:
task.result()
except asyncio.CancelledError:
_logger.warning(
"Timeout while setting power to %s %s",
component_category,
component_id,
)
except ApiClientError as exc:
_logger.warning(
"Got a client error while setting power to %s %s: %s",
component_category,
component_id,
exc,
)
except Exception: # pylint: disable=broad-except
_logger.exception(
"Unknown error while setting power to %s: %s",
component_category,
component_id,
)
else:
succeeded_components.add(component_id)
continue

failed_components.add(component_id)
failed_power += allocations[component_id]

if failed_components:
return PartialFailure(
failed_components=failed_components,
succeeded_components=succeeded_components,
failed_power=failed_power,
succeeded_power=target_power - failed_power - remaining_power,
excess_power=remaining_power,
request=request,
)
return Success(
succeeded_components=succeeded_components,
succeeded_power=target_power - remaining_power,
excess_power=remaining_power,
request=request,
)
Loading