Skip to content
51 changes: 51 additions & 0 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .._async import run_async
from ..event_loop._retry import ModelRetryStrategy
from ..event_loop.event_loop import INITIAL_DELAY, MAX_ATTEMPTS, MAX_DELAY, event_loop_cycle
from ..experimental.checkpoint import Checkpoint
from ..tools._tool_helpers import generate_missing_tool_result_content
from ..types._snapshot import (
SNAPSHOT_SCHEMA_VERSION,
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(
tool_executor: ToolExecutor | None = None,
retry_strategy: ModelRetryStrategy | _DefaultRetryStrategySentinel | None = _DEFAULT_RETRY_STRATEGY,
concurrent_invocation_mode: ConcurrentInvocationMode = ConcurrentInvocationMode.THROW,
checkpointing: bool = False,
Comment thread
JackYPCOnline marked this conversation as resolved.
):
"""Initialize the Agent with the specified configuration.

Expand Down Expand Up @@ -214,6 +216,11 @@ def __init__(
Set to "unsafe_reentrant" to skip lock acquisition entirely, allowing concurrent invocations.
Warning: "unsafe_reentrant" makes no guarantees about resulting behavior and is provided
only for advanced use cases where the caller understands the risks.
checkpointing: When True, the event loop pauses at cycle boundaries (after model call,
after all tools execute) and returns an AgentResult with stop_reason="checkpoint"
and a populated ``checkpoint`` field. Persist the checkpoint and resume by passing a
``CheckpointResumeContent`` block as the next prompt. Defaults to False.
See :mod:`strands.experimental.checkpoint` for usage and limitations.

Raises:
ValueError: If agent id contains path separators.
Expand Down Expand Up @@ -304,6 +311,10 @@ def __init__(

self._interrupt_state = _InterruptState()

# Checkpointing: when True, event loop pauses at cycle boundaries
self._checkpointing: bool = checkpointing
self._checkpoint_resume_context: Checkpoint | None = None

# Runtime state for model providers (e.g., server-side response ids)
self._model_state: dict[str, Any] = {}

Expand Down Expand Up @@ -998,6 +1009,46 @@ async def _convert_prompt_to_messages(self, prompt: AgentInput) -> Messages:
if self._interrupt_state.activated:
return []

# Resume detection — must run before existing shape handling so checkpointResume
# blocks aren't misinterpreted as content blocks. Mirrors _InterruptState.resume()
# conventions (TypeError for shape, KeyError for lookup, ValueError for misconfig;
Comment thread
JackYPCOnline marked this conversation as resolved.
Outdated
# error messages use the SDK's key=<value> | message format).
if isinstance(prompt, list) and prompt:
has_checkpoint_resume = any(isinstance(c, dict) and "checkpointResume" in c for c in prompt)
Comment thread
JackYPCOnline marked this conversation as resolved.
Outdated
if has_checkpoint_resume:
if not self._checkpointing:
raise ValueError(
"Received checkpointResume block but agent was created with "
"checkpointing=False. Pass checkpointing=True when constructing "
"the Agent to enable durable execution."
)

invalid_types = [
key
for content in prompt
if isinstance(content, dict)
for key in content
if key != "checkpointResume"
]
if invalid_types:
raise TypeError(
f"content_types=<{invalid_types}> | checkpointResume cannot be mixed with other content types"
)

if len(prompt) != 1:
raise TypeError(
f"block_count=<{len(prompt)}> | only one checkpointResume block permitted per prompt"
)

resume_block = prompt[0].get("checkpointResume", {})
if not isinstance(resume_block, dict) or "checkpoint" not in resume_block:
raise KeyError("checkpoint | missing required key in checkpointResume block")

checkpoint = Checkpoint.from_dict(resume_block["checkpoint"])
self.load_snapshot(Snapshot.from_dict(checkpoint.snapshot))
Comment thread
JackYPCOnline marked this conversation as resolved.
Outdated
self._checkpoint_resume_context = checkpoint
return []

messages: Messages | None = None
if prompt is not None:
# Check if the latest message is toolUse
Expand Down
18 changes: 16 additions & 2 deletions src/strands/agent/agent_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from pydantic import BaseModel

from ..experimental.checkpoint import Checkpoint
from ..interrupt import Interrupt
from ..telemetry.metrics import EventLoopMetrics
from ..types.content import Message
Expand All @@ -26,6 +27,9 @@ class AgentResult:
state: Additional state information from the event loop.
interrupts: List of interrupts if raised by user.
structured_output: Parsed structured output when structured_output_model was specified.
checkpoint: Checkpoint captured when the agent paused for durable execution.
Populated only when stop_reason == "checkpoint". See
strands.experimental.checkpoint for usage.
"""

stop_reason: StopReason
Expand All @@ -34,6 +38,7 @@ class AgentResult:
state: Any
interrupts: Sequence[Interrupt] | None = None
structured_output: BaseModel | None = None
checkpoint: Checkpoint | None = None

@property
def context_size(self) -> int | None:
Expand Down Expand Up @@ -85,15 +90,23 @@ def from_dict(cls, data: dict[str, Any]) -> "AgentResult":
Returns:
AgentResult instance
Raises:
TypeError: If the data format is invalid@
TypeError: If the data format is invalid
"""
if data.get("type") != "agent_result":
raise TypeError(f"AgentResult.from_dict: unexpected type {data.get('type')!r}")

message = cast(Message, data.get("message"))
stop_reason = cast(StopReason, data.get("stop_reason"))
checkpoint_data = data.get("checkpoint")
checkpoint = Checkpoint.from_dict(checkpoint_data) if checkpoint_data else None

return cls(message=message, stop_reason=stop_reason, metrics=EventLoopMetrics(), state={})
return cls(
message=message,
stop_reason=stop_reason,
metrics=EventLoopMetrics(),
state={},
checkpoint=checkpoint,
)

def to_dict(self) -> dict[str, Any]:
"""Convert this AgentResult to JSON-serializable dictionary.
Expand All @@ -105,4 +118,5 @@ def to_dict(self) -> dict[str, Any]:
"type": "agent_result",
"message": self.message,
"stop_reason": self.stop_reason,
"checkpoint": self.checkpoint.to_dict() if self.checkpoint else None,
}
58 changes: 58 additions & 0 deletions src/strands/event_loop/event_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from opentelemetry import trace as trace_api

from ..experimental.checkpoint import Checkpoint
from ..hooks import AfterModelCallEvent, BeforeModelCallEvent, MessageAddedEvent
from ..telemetry.metrics import Trace
from ..telemetry.tracer import Tracer, get_tracer
Expand Down Expand Up @@ -122,6 +123,19 @@ async def event_loop_cycle(
# Initialize state and get cycle trace
if "request_state" not in invocation_state:
invocation_state["request_state"] = {}

# Consume checkpoint resume context (one-shot: cleared after reading).
# Cross-invocation state (resume context) lives on the agent; within-cycle
# transient state (resume position for the skip check, cycle index) lives
# in invocation_state.
resume_ctx = agent._checkpoint_resume_context
Comment thread
JackYPCOnline marked this conversation as resolved.
Outdated
if resume_ctx is not None:
agent._checkpoint_resume_context = None
# after_tools completed that cycle, so next cycle starts at +1
next_cycle = resume_ctx.cycle_index + 1 if resume_ctx.position == "after_tools" else resume_ctx.cycle_index
invocation_state["_checkpoint_cycle_index"] = next_cycle
invocation_state["_checkpoint_resume_position"] = resume_ctx.position

attributes = {"event_loop_cycle_id": str(invocation_state.get("event_loop_cycle_id"))}
cycle_start_time, cycle_trace = agent.event_loop_metrics.start_cycle(attributes=attributes)
invocation_state["event_loop_cycle_trace"] = cycle_trace
Expand Down Expand Up @@ -181,6 +195,32 @@ async def event_loop_cycle(
)

if stop_reason == "tool_use":
# Checkpoint after model call, before tool execution.
# One-shot pop: safe because after_model always returns before reaching
# after_tools, so the stashed position is only consumed once.
if agent._checkpointing:
Comment thread
JackYPCOnline marked this conversation as resolved.
Outdated
resume_pos = invocation_state.pop("_checkpoint_resume_position", None)
if resume_pos == "after_model":
pass # Just resumed here — skip re-checkpoint, proceed to tools
else:
cycle_index = invocation_state.get("_checkpoint_cycle_index", 0)
Comment thread
JackYPCOnline marked this conversation as resolved.
checkpoint = Checkpoint(
position="after_model",
cycle_index=cycle_index,
snapshot=agent.take_snapshot(preset="session").to_dict(),
)
agent.event_loop_metrics.end_cycle(cycle_start_time, cycle_trace)
if cycle_span:
tracer.end_event_loop_cycle_span(span=cycle_span, message=message)
yield EventLoopStopEvent(
"checkpoint",
message,
agent.event_loop_metrics,
invocation_state["request_state"],
checkpoint=checkpoint,
)
return

# Handle tool execution
tool_events = _handle_tool_execution(
stop_reason,
Expand Down Expand Up @@ -590,6 +630,24 @@ async def _handle_tool_execution(
)
return

# Checkpoint after all tools complete, before the next model call.
if agent._checkpointing:
cycle_index = invocation_state.get("_checkpoint_cycle_index", 0)
invocation_state["_checkpoint_cycle_index"] = cycle_index + 1
checkpoint = Checkpoint(
position="after_tools",
cycle_index=cycle_index,
snapshot=agent.take_snapshot(preset="session").to_dict(),
)
yield EventLoopStopEvent(
"checkpoint",
message,
agent.event_loop_metrics,
invocation_state["request_state"],
checkpoint=checkpoint,
)
return

events = recurse_event_loop(
agent=agent, invocation_state=invocation_state, structured_output_context=structured_output_context
)
Expand Down
9 changes: 8 additions & 1 deletion src/strands/experimental/checkpoint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,12 @@
"""

from .checkpoint import CHECKPOINT_SCHEMA_VERSION, Checkpoint, CheckpointPosition
from .types import CheckpointResumeContent, CheckpointResumeDict

__all__ = ["CHECKPOINT_SCHEMA_VERSION", "Checkpoint", "CheckpointPosition"]
__all__ = [
"CHECKPOINT_SCHEMA_VERSION",
"Checkpoint",
"CheckpointPosition",
"CheckpointResumeContent",
"CheckpointResumeDict",
]
14 changes: 12 additions & 2 deletions src/strands/experimental/checkpoint/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
- State via AgentResult.checkpoint field
- Resume via checkpointResume content block in next agent() call

Interaction with interrupts:
- Interrupts take priority over checkpoints. If a tool raises an Interrupt
during a checkpointing=True cycle, the event loop returns
stop_reason="interrupt" (not "checkpoint"). The after_tools checkpoint
is never reached because the interrupt path returns early.
- This is intentional: interrupts require human input, checkpoints are
for worker-level durability. Different semantics, different priorities.

V0 Known Limitations:
- Metrics reset on each resume call. The caller is responsible for aggregating
metrics across a durable run. EventLoopMetrics reflects only the current call.
Expand All @@ -35,6 +43,8 @@
from dataclasses import asdict, dataclass, field
from typing import Any, Literal

from ...types.exceptions import CheckpointException

logger = logging.getLogger(__name__)

CHECKPOINT_SCHEMA_VERSION = "1.0"
Expand Down Expand Up @@ -79,11 +89,11 @@ def from_dict(cls, data: dict[str, Any]) -> "Checkpoint":
data: Serialized checkpoint data.

Raises:
ValueError: If schema_version doesn't match the current version.
CheckpointException: If schema_version doesn't match the current version.
"""
version = data.get("schema_version", "")
if version != CHECKPOINT_SCHEMA_VERSION:
raise ValueError(
raise CheckpointException(
Comment thread
JackYPCOnline marked this conversation as resolved.
f"Checkpoints with schema version {version!r} are not compatible "
f"with current version {CHECKPOINT_SCHEMA_VERSION}."
)
Expand Down
37 changes: 37 additions & 0 deletions src/strands/experimental/checkpoint/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Content-block types for checkpoint resume.

Mirrors the interrupt pattern (`InterruptResponseContent` in `types/interrupt.py`).
Stays under `experimental/checkpoint/` for V0; will graduate to
`src/strands/types/checkpoint.py` when the feature exits experimental.
"""

from typing import Any, TypedDict


class CheckpointResumeDict(TypedDict):
"""Inner payload for a checkpointResume content block.

Attributes:
checkpoint: Serialized Checkpoint as produced by ``Checkpoint.to_dict()``.
"""

checkpoint: dict[str, Any]


class CheckpointResumeContent(TypedDict):
"""Content block that resumes a paused durable agent.

Pass a list containing exactly one instance of this type as the prompt to
``Agent.invoke_async`` / ``Agent.__call__`` to resume from a checkpoint.

Example::

result = await agent.invoke_async(
[{"checkpointResume": {"checkpoint": previous_checkpoint.to_dict()}}]
)

Attributes:
checkpointResume: The resume payload carrying the serialized checkpoint.
"""

checkpointResume: CheckpointResumeDict
7 changes: 6 additions & 1 deletion src/strands/types/_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
if TYPE_CHECKING:
from ..agent import AgentResult
from ..agent._agent_as_tool import _AgentAsTool
from ..experimental.checkpoint import Checkpoint
from ..multiagent.base import MultiAgentResult, NodeResult


Expand Down Expand Up @@ -227,6 +228,7 @@ def __init__(
request_state: Any,
interrupts: Sequence[Interrupt] | None = None,
structured_output: BaseModel | None = None,
checkpoint: "Checkpoint | None" = None,
) -> None:
"""Initialize with the final execution results.

Expand All @@ -237,8 +239,11 @@ def __init__(
request_state: Final state of the agent execution
interrupts: Interrupts raised by user during agent execution.
structured_output: Optional structured output result
checkpoint: Optional checkpoint when stop_reason == "checkpoint".
"""
super().__init__({"stop": (stop_reason, message, metrics, request_state, interrupts, structured_output)})
super().__init__(
{"stop": (stop_reason, message, metrics, request_state, interrupts, structured_output, checkpoint)}
Comment thread
JackYPCOnline marked this conversation as resolved.
)

@property
@override
Expand Down
6 changes: 6 additions & 0 deletions src/strands/types/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,9 @@ class ConcurrencyException(Exception):
"""

pass


class CheckpointException(Exception):
"""Exception raised when checkpoint operations fail (e.g., incompatible schema version)."""

pass
Loading
Loading