Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions strands_robots/simulation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@

# Light imports (no heavy deps - stdlib + dataclasses only)
from strands_robots.simulation.base import SimEngine
from strands_robots.simulation.benchmark import (
BenchmarkCompatibilityError,
BenchmarkProtocol,
StepInfo,
get_benchmark,
list_benchmarks,
register_benchmark,
unregister_benchmark,
)
from strands_robots.simulation.benchmark_spec import (
DeclarativeBenchmark,
register_benchmark_from_file,
)
from strands_robots.simulation.factory import (
create_simulation,
list_backends,
Expand All @@ -71,6 +84,11 @@
SimWorld,
TrajectoryStep,
)
from strands_robots.simulation.predicates import (
PREDICATE_REGISTRY,
make_predicate,
register_predicate,
)

# Heavy imports (lazy - need strands SDK + mujoco)
_LAZY_IMPORTS: dict[str, tuple[str, str]] = {
Expand Down Expand Up @@ -108,6 +126,20 @@
"resolve_urdf",
"list_registered_urdfs",
"list_available_models",
# Benchmark protocol + registry
"BenchmarkProtocol",
"BenchmarkCompatibilityError",
"StepInfo",
"register_benchmark",
"unregister_benchmark",
"get_benchmark",
"list_benchmarks",
# Declarative DSL + predicates
"DeclarativeBenchmark",
"register_benchmark_from_file",
"PREDICATE_REGISTRY",
"make_predicate",
"register_predicate",
]


Expand Down
181 changes: 181 additions & 0 deletions strands_robots/simulation/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,187 @@ def eval_policy(
success_fn=success_fn,
)

# Benchmark protocol facades

def evaluate_benchmark(
self,
benchmark_name: str,
robot_name: str | None = None,
policy_provider: str = "mock",
policy_config: dict[str, Any] | None = None,
instruction: str = "",
n_episodes: int = 1,
seed: int | None = None,
) -> dict[str, Any]:
"""Run a registered :class:`BenchmarkProtocol` against the current sim.

Benchmark-agnostic evaluation entry point. Looks up ``benchmark_name``
in the global benchmark registry, validates robot compatibility, and
forwards to :meth:`PolicyRunner.evaluate` with the spec.
``max_steps`` comes from the benchmark (not a parameter here).

Args:
benchmark_name: Key from :func:`register_benchmark` /
:func:`register_benchmark_from_file`.
robot_name: Robot to evaluate. If ``None`` and the benchmark has
exactly one supported robot that matches a loaded robot, that
robot is picked; otherwise returns an error.
policy_provider: Policy provider name (forwarded to
:func:`create_policy`).
policy_config: Provider-specific kwargs.
instruction: Natural-language instruction for the policy.
n_episodes: Number of episodes.
seed: Master RNG seed for per-episode reproducibility.

Returns:
Standard status dict. On success, carries per-episode cumulative
reward + aggregate success_rate / avg_reward / avg_steps in the
JSON payload.
"""
from strands_robots.policies import create_policy
from strands_robots.simulation.benchmark import get_benchmark

spec = get_benchmark(benchmark_name)
if spec is None:
from strands_robots.simulation.benchmark import list_benchmarks as _list

available = sorted(_list().keys())
return {
"status": "error",
"content": [
{
"text": (
f"evaluate_benchmark: no benchmark registered under "
f"{benchmark_name!r}. Registered: {available}. "
"Call register_benchmark_from_file or register_benchmark first."
)
}
],
}

robots = self.list_robots()
if not robots:
return {"status": "error", "content": [{"text": "No robots in sim. Add one first."}]}

resolved_robot = robot_name
if not resolved_robot:
# Try to pick a robot. Prefer single-robot scenes; multi-robot
# scenes require explicit selection.
if len(robots) == 1:
resolved_robot = robots[0]
else:
return {
"status": "error",
"content": [
{
"text": (
f"evaluate_benchmark: 'robot_name' is required when the sim has "
f"multiple robots. Loaded: {robots}"
)
}
],
}
if resolved_robot not in robots:
return {
"status": "error",
"content": [{"text": f"Robot '{resolved_robot}' not found. Loaded: {robots}"}],
}

policy = create_policy(policy_provider, **(policy_config or {}))
policy.set_robot_state_keys(self.robot_joint_names(resolved_robot))

return PolicyRunner(self).evaluate(
resolved_robot,
policy,
instruction=instruction,
n_episodes=n_episodes,
spec=spec,
seed=seed,
)

def list_benchmarks(self) -> dict[str, Any]:
"""Enumerate registered benchmarks.

Returns a standard status dict whose JSON payload contains the
:func:`~strands_robots.simulation.benchmark.list_benchmarks`
metadata snapshot. Safe to call from any backend; the registry is
engine-agnostic.
"""
from strands_robots.simulation.benchmark import list_benchmarks as _list

snapshot = _list()
if not snapshot:
text = "No benchmarks registered. Use register_benchmark_from_file to add one."
else:
lines = [f"Registered benchmarks ({len(snapshot)}):"]
for name, meta in snapshot.items():
lines.append(
f" • {name}: {meta['class']} "
f"(robots={meta['supported_robots'] or 'any'}, "
f"default={meta['default_robot']}, "
f"max_steps={meta['max_steps']})"
)
text = "\n".join(lines)
return {
"status": "success",
"content": [{"text": text}, {"json": {"benchmarks": snapshot}}],
}

def register_benchmark_from_file(
self,
benchmark_name: str,
spec_path: str,
) -> dict[str, Any]:
"""Load a declarative benchmark spec from disk and register it.

Wraps :func:`strands_robots.simulation.benchmark_spec.register_benchmark_from_file`
so agents can author benchmarks as YAML / JSON at runtime. Parsing
errors surface as structured error dicts rather than exceptions.
"""
from strands_robots.simulation.benchmark_spec import (
register_benchmark_from_file as _register,
)

if not benchmark_name:
return {
"status": "error",
"content": [{"text": "register_benchmark_from_file: 'benchmark_name' must be non-empty."}],
}
if not spec_path:
return {
"status": "error",
"content": [{"text": "register_benchmark_from_file: 'spec_path' must be non-empty."}],
}
try:
benchmark = _register(benchmark_name, spec_path)
except FileNotFoundError as e:
return {"status": "error", "content": [{"text": f"register_benchmark_from_file: {e}"}]}
except ValueError as e:
return {"status": "error", "content": [{"text": f"register_benchmark_from_file: {e}"}]}
except ImportError as e:
# YAML support requires pyyaml; surface the install hint verbatim.
return {"status": "error", "content": [{"text": f"{e}"}]}
except Exception as e: # noqa: BLE001 - defensive catch-all with clear message
return {
"status": "error",
"content": [{"text": f"register_benchmark_from_file: unexpected error: {e}"}],
}

return {
"status": "success",
"content": [
{
"text": (
f"📋 Registered benchmark '{benchmark_name}' from {spec_path}\n"
f" class: {type(benchmark).__name__}\n"
f" supported_robots: {benchmark.supported_robots or 'any'}\n"
f" default_robot: {benchmark.default_robot}\n"
f" max_steps: {benchmark.max_steps}"
)
}
],
}

def _make_run_policy_hook(self, robot_name: str, instruction: str) -> Any:
"""Override to return an ``on_frame(step, obs, action)`` callable.

Expand Down
Loading
Loading