Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,15 @@
from ..tools.registry import ToolRegistry
from ..tools.structured_output._structured_output_context import StructuredOutputContext
from ..tools.watcher import ToolWatcher
from ..types._events import AgentResultEvent, EventLoopStopEvent, InitEventLoopEvent, ModelStreamChunkEvent, TypedEvent
from ..types._events import (
AgentResultEvent,
EventLoopStopEvent,
InitEventLoopEvent,
ModelStreamChunkEvent,
StartEventLoopEvent,
TextStreamEvent,
TypedEvent,
)
from ..types.agent import AgentInput, ConcurrentInvocationMode
from ..types.content import ContentBlock, Message, Messages, SystemContentBlock
from ..types.exceptions import ConcurrencyException, ContextWindowOverflowException
Expand Down Expand Up @@ -756,6 +764,7 @@ async def stream_async(
invocation_state: dict[str, Any] | None = None,
structured_output_model: type[BaseModel] | None = None,
structured_output_prompt: str | None = None,
stream_final_turn_only: bool = False,
**kwargs: Any,
) -> AsyncIterator[Any]:
"""Process a natural language prompt and yield events as an async iterator.
Expand All @@ -775,6 +784,11 @@ async def stream_async(
invocation_state: Additional parameters to pass through the event loop.
structured_output_model: Pydantic model type(s) for structured output (overrides agent default).
structured_output_prompt: Custom prompt for forcing structured output (overrides agent default).
stream_final_turn_only: When True, buffers text events from intermediate turns and only yields
text events from the final turn (where stop_reason is "end_turn"). Non-text events such as
lifecycle, tool use, reasoning, and citation events are yielded normally regardless of this
setting. When False (default), all events are yielded as they are produced with no change
in behavior.
**kwargs: Additional parameters to pass to the event loop.[Deprecating]

Yields:
Expand All @@ -791,11 +805,21 @@ async def stream_async(
Exception: Any exceptions from the agent invocation will be propagated to the caller.

Example:
Stream all events (default behavior):

```python
async for event in agent.stream_async("Analyze this data"):
if "data" in event:
yield event["data"]
```

Stream only the final answer (skip intermediate tool-use turns):

```python
async for event in agent.stream_async("Analyze this data", stream_final_turn_only=True):
if "data" in event:
yield event["data"] # Only receives final turn text
```
"""
# Conditionally acquire lock based on concurrent_invocation_mode
# Using threading.Lock instead of asyncio.Lock because run_async() creates
Expand Down Expand Up @@ -835,9 +859,25 @@ async def stream_async(
try:
events = self._run_loop(messages, merged_state, structured_output_model, structured_output_prompt)

text_event_buffer: list[dict[str, Any]] = []

async for event in events:
event.prepare(invocation_state=merged_state)

if stream_final_turn_only:
if isinstance(event, StartEventLoopEvent):
text_event_buffer.clear()
elif isinstance(event, TextStreamEvent):
text_event_buffer.append(event.as_dict())
continue
elif isinstance(event, EventLoopStopEvent):
stop_reason = event["stop"][0]
if stop_reason == "end_turn":
for buffered in text_event_buffer:
callback_handler(**buffered)
yield buffered
text_event_buffer.clear()

if event.is_callback_event:
as_dict = event.as_dict()
callback_handler(**as_dict)
Expand Down
Loading