From 60fdb5a8d819a7e62e247766ca30b7e493ad0dc1 Mon Sep 17 00:00:00 2001 From: Martin Laporte Date: Tue, 5 May 2026 14:32:32 +0200 Subject: [PATCH] feat(aws-strands): add tool_stream_event_handler hook to ToolBehavior Add a new optional ToolStreamEventHandler callback to ToolBehavior that is invoked for every intermediate event yielded by an async-generator tool (tool_stream_event). The handler receives (tool_use_id, stream_data) and may yield zero or more AG-UI events forwarded into the top-level stream. When a handler is registered the default state-snapshot behaviour is suppressed for that tool; the handler is responsible for any state updates it wants to emit. All other tools retain the existing behaviour. This makes it possible to surface sub-agent streaming activity (e.g. specialist-run ActivityMessages) without reimplementing the full event loop in a subclass. --- .../aws-strands/python/pyproject.toml | 2 +- .../python/src/ag_ui_strands/__init__.py | 2 ++ .../python/src/ag_ui_strands/agent.py | 20 +++++++++++++++++-- .../python/src/ag_ui_strands/config.py | 12 +++++++++++ 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/integrations/aws-strands/python/pyproject.toml b/integrations/aws-strands/python/pyproject.toml index c608119896..287ab40d45 100644 --- a/integrations/aws-strands/python/pyproject.toml +++ b/integrations/aws-strands/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ag_ui_strands" -version = "0.1.7" +version = "0.1.8" authors = [ { name = "AG-UI Contributors" } ] diff --git a/integrations/aws-strands/python/src/ag_ui_strands/__init__.py b/integrations/aws-strands/python/src/ag_ui_strands/__init__.py index 85a04bc0e6..24bc0d1230 100644 --- a/integrations/aws-strands/python/src/ag_ui_strands/__init__.py +++ b/integrations/aws-strands/python/src/ag_ui_strands/__init__.py @@ -14,6 +14,7 @@ ToolResultContext, PredictStateMapping, SessionManagerProvider, + ToolStreamEventHandler, ) __all__ = [ @@ -29,5 +30,6 @@ "ToolResultContext", "PredictStateMapping", "SessionManagerProvider", + "ToolStreamEventHandler", ] diff --git a/integrations/aws-strands/python/src/ag_ui_strands/agent.py b/integrations/aws-strands/python/src/ag_ui_strands/agent.py index 1643da3ca2..0816f6fd52 100644 --- a/integrations/aws-strands/python/src/ag_ui_strands/agent.py +++ b/integrations/aws-strands/python/src/ag_ui_strands/agent.py @@ -626,9 +626,25 @@ async def run(self, input_data: RunAgentInput) -> AsyncIterator[Any]: elif "tool_stream_event" in event: tool_stream = event["tool_stream_event"] stream_data = tool_stream.get("data", {}) + _tse_tool_use = tool_stream.get("tool_use", {}) + _tse_tool_name = _tse_tool_use.get("name", "") + _tse_tool_use_id = _tse_tool_use.get("toolUseId", "") + _tse_behavior = self.config.tool_behaviors.get(_tse_tool_name) if _tse_tool_name else None - # Emit state snapshot if tool yielded state - if isinstance(stream_data, dict) and "state" in stream_data: + if _tse_behavior and _tse_behavior.tool_stream_event_handler: + try: + async for _tse_event in _tse_behavior.tool_stream_event_handler( + _tse_tool_use_id, stream_data + ): + if _tse_event is not None: + yield _tse_event + except Exception as _tse_exc: + logger.warning( + f"tool_stream_event_handler failed for {_tse_tool_name}: {_tse_exc}", + exc_info=True, + ) + elif isinstance(stream_data, dict) and "state" in stream_data: + # Default behaviour: emit state snapshot when tool yields {"state": ...} yield StateSnapshotEvent( type=EventType.STATE_SNAPSHOT, snapshot=stream_data["state"], diff --git a/integrations/aws-strands/python/src/ag_ui_strands/config.py b/integrations/aws-strands/python/src/ag_ui_strands/config.py index e845d622db..9f040b8069 100644 --- a/integrations/aws-strands/python/src/ag_ui_strands/config.py +++ b/integrations/aws-strands/python/src/ag_ui_strands/config.py @@ -48,6 +48,17 @@ class ToolResultContext(ToolCallContext): CustomResultHandler = Callable[[ToolResultContext], AsyncIterator[Any]] StateContextBuilder = Callable[[RunAgentInput, str], str] SessionManagerProvider = Callable[[RunAgentInput], Awaitable[Optional[SessionManager]] | Optional[SessionManager]] +ToolStreamEventHandler = Callable[[str, Any], AsyncIterator[Any]] +"""Handler for raw tool_stream_event data emitted by async-generator tools. + +Called with (tool_use_id: str, stream_data: Any) for every intermediate event +yielded by the tool while it is executing. The handler may yield zero or more +AG-UI Event objects which are forwarded directly into the top-level event stream. + +When a handler is registered for a tool, the default behaviour of emitting a +StateSnapshotEvent for ``{"state": ...}`` payloads is suppressed for that tool. +The handler is responsible for any state updates it wants to emit. +""" @dataclass @@ -78,6 +89,7 @@ class ToolBehavior: state_from_args: Optional[StateFromArgs] = None state_from_result: Optional[StateFromResult] = None custom_result_handler: Optional[CustomResultHandler] = None + tool_stream_event_handler: Optional[ToolStreamEventHandler] = None @dataclass