Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ sim-mujoco = [
"imageio>=2.28.0,<3.0.0",
"imageio-ffmpeg>=0.4.0,<1.0.0",
]
benchmark-libero = [
"libero>=0.1.0,<1.0.0",
]
all = [
"strands-robots[groot-service]",
"strands-robots[lerobot]",
Expand Down Expand Up @@ -132,7 +135,7 @@ ignore_missing_imports = false

# Third-party libs without type stubs
[[tool.mypy.overrides]]
module = ["lerobot.*", "gr00t.*", "draccus.*", "msgpack.*", "zmq.*", "huggingface_hub.*", "serial.*", "psutil.*", "torch.*", "torchvision.*", "transformers.*", "einops.*", "robot_descriptions.*", "mujoco.*", "imageio.*"]
module = ["lerobot.*", "gr00t.*", "draccus.*", "msgpack.*", "zmq.*", "huggingface_hub.*", "serial.*", "psutil.*", "torch.*", "torchvision.*", "transformers.*", "einops.*", "robot_descriptions.*", "mujoco.*", "imageio.*", "libero.*"]
ignore_missing_imports = true

# @tool decorator injects runtime signatures mypy cannot check
Expand Down
14 changes: 14 additions & 0 deletions strands_robots/benchmarks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""Strands Robots Benchmarks - per-benchmark adapters layered on :mod:`strands_robots.simulation.benchmark`.
Adapters live in optional extras so the core package stays dependency-free.
Importing this namespace is cheap; the heavy work happens when a specific
adapter submodule is imported (e.g. ``from strands_robots.benchmarks.libero
import LiberoAdapter``).
Currently shipped adapters:
* ``strands_robots.benchmarks.libero`` - LIBERO (Panda-only, ~130 tasks).
Install with ``pip install 'strands-robots[benchmark-libero]'``.
Tracked follow-ups: Meta-World (#108), RoboSuite (#109).
"""
41 changes: 41 additions & 0 deletions strands_robots/benchmarks/libero/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""LIBERO benchmark adapter - see :mod:`strands_robots.benchmarks.libero.adapter`.

Public surface (re-exported from submodules so agents can do
``from strands_robots.benchmarks.libero import LiberoAdapter``):

* :class:`LiberoAdapter` - ``BenchmarkProtocol`` built around a BDDL task.
* :func:`load_libero_suite` - bulk-register every task in a suite.
* :class:`BDDLParseError` - raised on malformed BDDL input.

The adapter and parser have **no** dependency on the ``libero`` pip
package - you can use them with your own BDDL files. Only
:func:`load_libero_suite` touches the upstream package (to discover task
files), and only when you don't pass an explicit ``bddl_dir=``.
"""

from strands_robots.benchmarks.libero.adapter import BDDLParseError, LiberoAdapter
from strands_robots.benchmarks.libero.bddl_parser import (
PREDICATE_VOCABULARY,
BDDLProblem,
compile_goal,
parse_bddl,
parse_bddl_file,
)
from strands_robots.benchmarks.libero.suite import (
SUITE_NAMES,
available_suites,
load_libero_suite,
)

__all__ = [
"BDDLParseError",
"BDDLProblem",
"LiberoAdapter",
"PREDICATE_VOCABULARY",
"SUITE_NAMES",
"available_suites",
"compile_goal",
"load_libero_suite",
"parse_bddl",
"parse_bddl_file",
]
297 changes: 297 additions & 0 deletions strands_robots/benchmarks/libero/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
"""``LiberoAdapter`` - :class:`BenchmarkProtocol` driven by a LIBERO BDDL file.

LIBERO is a suite of ~130 tabletop manipulation tasks built around a Franka
Panda. Each task ships as a BDDL problem file + an MJCF scene. The adapter
compiles the BDDL ``:goal`` into a sparse success predicate via
:mod:`strands_robots.benchmarks.libero.bddl_parser` and drives the scene
through the standard :class:`BenchmarkProtocol` lifecycle:

1. :meth:`on_episode_start` - optional ``sim.load_scene(scene_path)``, then
the base ``BenchmarkProtocol`` compatibility check (Panda-only), then
per-episode jitter of ``(:init ...)`` object positions.
2. :meth:`on_step` - sparse: ``StepInfo(reward=0.0, done=False)``. LIBERO
does not define a dense reward.
3. :meth:`is_success` - walks the compiled ``:goal`` predicate tree against
the current sim state.

**Panda-only by design.** LIBERO's scene MJCFs ``<include>`` Panda geometry
and BDDL predicates reference Panda gripper body names
(``robot0_gripper_*``). Retargeting to a different robot would require
rewriting every BDDL predicate against different body names and is out of
scope for this adapter. Subclass :class:`LiberoAdapter` and override
:attr:`supported_robots` + :attr:`default_robot` if you know what you're
doing.

The adapter does NOT require the ``libero`` Python package to be installed -
only a BDDL string / file and (optionally) an MJCF scene path. The
:func:`strands_robots.benchmarks.libero.suite.load_libero_suite` helper is
the one that pulls in the upstream package to discover task files.
"""

from __future__ import annotations

import logging
from collections.abc import Callable
from pathlib import Path
from typing import TYPE_CHECKING, Any

from strands_robots.benchmarks.libero.bddl_parser import (
BDDLParseError,
BDDLProblem,
Node,
compile_goal,
parse_bddl,
parse_bddl_file,
)
from strands_robots.simulation.benchmark import BenchmarkProtocol, StepInfo

if TYPE_CHECKING:
import random

from strands_robots.simulation.base import SimEngine

logger = logging.getLogger(__name__)


class LiberoAdapter(BenchmarkProtocol):
"""Panda-only :class:`BenchmarkProtocol` driven by a parsed LIBERO BDDL task.

Construct with a BDDL file path (``from_file``) or raw BDDL text
(``from_text``) - direct ``__init__`` is for advanced use when you
already have a :class:`BDDLProblem`.

Example::

from strands_robots.benchmarks.libero import LiberoAdapter

adapter = LiberoAdapter.from_file(
"libero/tasks/libero_spatial/pick_up_the_red_cube.bddl",
scene_path="libero/assets/scenes/libero_spatial_scene.xml",
)
sim.register_benchmark("pick-red-cube", adapter)
sim.evaluate_benchmark("pick-red-cube", policy_provider="mock",
n_episodes=10, seed=42)

Attributes:
max_steps: Default 300 (LIBERO convention). Override per-task by
passing ``max_steps=`` to the constructor or mutating the
attribute after construction.
problem: The parsed :class:`BDDLProblem`. Stored for introspection
(agents may read ``problem.language`` as the instruction).
"""

max_steps: int = 300
supported_robots_list: list[str] = ["panda"]
default_robot_name: str = "panda"

def __init__(
self,
problem: BDDLProblem,
*,
scene_path: str | None = None,
max_steps: int | None = None,
init_jitter: float = 0.02,
):
"""Construct from a pre-parsed :class:`BDDLProblem`.

Args:
problem: Parsed BDDL problem with a non-``None`` ``goal``.
scene_path: Optional MJCF to ``sim.load_scene()`` on each
episode start. ``None`` → assume scene is pre-loaded.
max_steps: Override the class-level 300.
init_jitter: Per-episode ±jitter (metres) applied to xy of every
object referenced by ``(:init (on A B))`` clauses. Set to 0
to disable jitter.

Raises:
ValueError: If ``problem.goal`` is ``None``.
"""
if problem.goal is None:
raise ValueError(f"LiberoAdapter: BDDL problem {problem.name!r} has no (:goal ...) block")
self.problem = problem
self.scene_path = scene_path
self._init_jitter = float(init_jitter)
if self._init_jitter < 0:
raise ValueError(f"init_jitter must be >= 0, got {init_jitter}")
if max_steps is not None:
self.max_steps = int(max_steps)
self._success_fn: Callable[[SimEngine], bool] = compile_goal(problem.goal)

# Construction helpers

@classmethod
def from_file(
cls,
bddl_path: str | Path,
*,
scene_path: str | None = None,
max_steps: int | None = None,
init_jitter: float = 0.02,
) -> LiberoAdapter:
"""Parse a ``.bddl`` file from disk and build an adapter.

Raises :class:`FileNotFoundError` / :class:`BDDLParseError` on bad
input - callers that want structured error dicts should catch and
convert.
"""
problem = parse_bddl_file(bddl_path)
return cls(
problem,
scene_path=scene_path,
max_steps=max_steps,
init_jitter=init_jitter,
)

@classmethod
def from_text(
cls,
bddl_text: str,
*,
scene_path: str | None = None,
max_steps: int | None = None,
init_jitter: float = 0.02,
) -> LiberoAdapter:
"""Parse a BDDL string directly - useful in tests."""
problem = parse_bddl(bddl_text)
return cls(
problem,
scene_path=scene_path,
max_steps=max_steps,
init_jitter=init_jitter,
)

# BenchmarkProtocol interface

@property
def supported_robots(self) -> list[str]:
return list(self.supported_robots_list)

@property
def default_robot(self) -> str:
return self.default_robot_name

@property
def instruction(self) -> str:
"""Language instruction from the BDDL ``:language`` clause, or ``""``."""
return self.problem.language or ""

def on_episode_start(self, sim: SimEngine, rng: random.Random) -> None:
"""Load the declared scene (if any), validate Panda, then apply init jitter.

Order matters: load_scene MUST happen before ``super().on_episode_start``
so the base compatibility check sees the scene's Panda robot rather
than reporting "sim is empty → load default_robot".
"""
if self.scene_path:
load_scene = getattr(sim, "load_scene", None)
if load_scene is None:
logger.warning(
"LiberoAdapter: sim has no load_scene(); skipping scene_path=%r",
self.scene_path,
)
else:
result = load_scene(self.scene_path)
if isinstance(result, dict) and result.get("status") == "error":
msg = (result.get("content") or [{}])[0].get("text", "")
raise RuntimeError(f"LiberoAdapter: load_scene({self.scene_path!r}) failed: {msg}")
super().on_episode_start(sim, rng)
if self._init_jitter > 0:
self._apply_init_jitter(sim, rng)

def on_step(
self,
sim: SimEngine,
obs: dict[str, Any],
action: dict[str, Any],
) -> StepInfo:
"""Sparse step: zero reward, never ``done``. Success is detected by
:meth:`is_success` at the outer eval loop."""
return StepInfo(reward=0.0, done=False)

def is_success(self, sim: SimEngine) -> bool:
return bool(self._success_fn(sim))

# Internals

def _apply_init_jitter(self, sim: SimEngine, rng: random.Random) -> None:
"""Apply ±jitter to xy of every body referenced by ``(:init (on A B))``.

Best-effort: if the sim doesn't expose ``move_object`` / ``get_body_state``,
or the body isn't in the scene, silently skip. This matches LIBERO's
"small random perturbation per episode" convention without requiring
full BDDL init semantics.
"""
move_object = getattr(sim, "move_object", None)
if move_object is None:
logger.debug("LiberoAdapter: sim has no move_object(); skipping init jitter")
return
get_body_state = getattr(sim, "get_body_state", None)
if get_body_state is None:
return

# Gather the set of bodies we want to jitter - BDDL init uses the same
# Pred grammar, so (on cube_1 table_1) means "jitter cube_1".
from strands_robots.benchmarks.libero.bddl_parser import Pred as _Pred

seen: set[str] = set()
for node in self.problem.init:
for body in _extract_init_targets(node):
seen.add(body)
_ = _Pred # referenced for clarity; actual test is inside _extract_init_targets

for body in sorted(seen):
try:
state = get_body_state(body_name=body)
except Exception as e: # noqa: BLE001 - defensive
logger.debug("jitter lookup for %r failed: %s", body, e)
continue
if not isinstance(state, dict) or state.get("status") != "success":
continue
pos = _extract_position(state)
if pos is None:
continue
jx = rng.uniform(-self._init_jitter, self._init_jitter)
jy = rng.uniform(-self._init_jitter, self._init_jitter)
new_pos = [pos[0] + jx, pos[1] + jy, pos[2]]
try:
move_object(name=body, position=new_pos)
except Exception as e: # noqa: BLE001 - jitter failures are not fatal
logger.debug("jitter apply for %r failed: %s", body, e)


def _extract_init_targets(node: Node) -> list[str]:
"""Return the first-arg body name of every leaf predicate in ``node``.

Init clauses like ``(on cube_1 table_1)`` and ``(upright bottle_1)``
share the convention that the first argument is the "subject" body -
the thing whose position we may want to jitter. Nested
``and``/``or``/``not`` are traversed; non-predicates are ignored.
"""
from strands_robots.benchmarks.libero.bddl_parser import And, Not, Or, Pred

if isinstance(node, Pred):
return [node.args[0]] if node.args else []
if isinstance(node, (And, Or)):
out: list[str] = []
for c in node.clauses:
out.extend(_extract_init_targets(c))
return out
if isinstance(node, Not):
return _extract_init_targets(node.clause)
return []


def _extract_position(state: dict[str, Any]) -> list[float] | None:
"""Pull ``{"json": {"position": [...]}}`` from a status-dict payload."""
for block in state.get("content", []) or []:
if isinstance(block, dict) and isinstance(block.get("json"), dict):
pos = block["json"].get("position")
if isinstance(pos, list) and len(pos) == 3 and all(isinstance(c, (int, float)) for c in pos):
return [float(c) for c in pos]
return None


__all__ = [
"BDDLParseError",
"LiberoAdapter",
]
Loading
Loading