Module: cube.task | Layer: 3 (per-task environment dynamics + evaluation)
A Task is a single scoreable problem. It unifies gym-style dynamics
(reset/step/close) with task-specific logic (evaluate, filter_actions,
obs_postprocess). One class, one place for the benchmark author to define both
what the agent can do and how it is scored.
class TaskMetadata(TypedBaseModel):
id: str # unique task identifier
split: Literal["train", "val", "test"] = "test"
abstract_description: str = "" # for search/filtering only — NOT the objective
recommended_max_steps: int | None = None # harness hint, not enforced
container_config: ContainerConfig | None = NoneTaskMetadata is the lightweight, eager-loaded view of a task: it ships in
the wheel and powers cube list, registry listings, glob-based subsetting,
and human inspection. Cube authors needing additional per-task fields
subclass TaskMetadata with named typed fields; polymorphism is preserved
through the TypedBaseModel _type discriminator. The base class accepts
only the framework-defined fields above.
Heavy per-task data (problem statements, patches, archives, evaluator
scripts, …) lives on a TaskExecutionInfo subclass surfaced via
Task.execution_info, not on TaskMetadata. See below.
The actual task objective is surfaced in the first Observation returned by reset().
abstract_description is for tooling (search, subsetting), never to be shown to the agent.
class TaskExecutionInfo(TypedBaseModel):
"""Heavy, lazy per-task execution data."""Subclassed by cubes that need heavy per-task data, e.g.:
class SWEBenchExecutionInfo(TaskExecutionInfo):
problem_statement: str
patch: str
test_patch: str
fail_to_pass: list[str]
pass_to_pass: list[str]Polymorphic via the TypedBaseModel _type discriminator. Populated on
the worker — typically inside TaskConfig.make() by validating
self.load_task_execution_info() against the subclass. The framework
never reads execution_info itself; it is for cube authors to surface
domain-specific heavy data with autocomplete and Pydantic validation.
Cubes with no heavy data leave the slot None; the base class is
instantiable but carries no fields.
class Task[TTMetadata: TaskMetadata](TypedBaseModel, ABC):
# Serializable fields — SerializeAsAny preserves subclass-specific fields
# through JSON round-trip (Pydantic would otherwise strip them to the
# declared base type). Required for every polymorphic field.
metadata: SerializeAsAny[TTMetadata]
execution_info: SerializeAsAny[TaskExecutionInfo] | None = None # heavy lazy data; populated on the worker
tool_config: SerializeAsAny[ToolConfig]
container_backend: ContainerBackend | None = None
runtime_context: RuntimeContext | None = None # from Benchmark._setup()
validate_per_step: bool = False # eval on every step, not just done
accept_agent_stop: bool = True # accept STOP_ACTION from agent
# Runtime (PrivateAttr, set in model_post_init)
_tool: AbstractTool | None
_container: Container | Noneexecution_info is the typed surface for heavy per-task data. Cubes
populate it inside TaskConfig.make(): validate
self.load_task_execution_info() against the cube's TaskExecutionInfo
subclass and pass to the Task constructor.
Tasks read typed fields directly: self.execution_info.problem_statement,
self.execution_info.patch, …
model_post_init (runs after Pydantic __init__):
- If
container_backendandmetadata.container_configare both set, launch the container. - Call
tool_config.make(container=self._container)to build the tool.
Abstract methods (implementers MUST provide):
reset() -> (Observation, dict)— set up initial state; also callself.tool.reset()evaluate(obs: Observation | None = None) -> (float, dict)— score the current state
Optional hooks (default: identity / no-op):
filter_actions(actions: list[ActionSchema]) -> list[ActionSchema]— whitelist subset of tool actionsobs_postprocess(obs: Observation) -> Observation— transform observations before returningfinished(obs: Observation | None = None) -> bool— early termination checkget_privileged_info() -> Content— solution, eval source, internal state (for debug/oracle agents)get_status() -> str— free-form status stringclose()— cleanup; default callsself.tool.close(). Override to add cleanup and callsuper().close().
Signature: step(action: Action | list[Action]) -> EnvironmentOutput
Accepts single Action or list (atomic multi-action step). Logic:
- Loop over actions:
- If
action.name == STOP_ACTION.nameandself.accept_agent_stop: append"Task finished by the agent."observation, setdone=True, break. - Otherwise, call
self.tool.execute_action(action). Time it. - If result is
Observation:obs += result - If result is
StepError: seterror,done=True, break - Any other type → raise
ValueError
- If
done = done or self.finished(obs)- If
doneorself.validate_per_step: callself.evaluate(obs)→(reward, info) - Apply
obs = self.obs_postprocess(obs) - Populate
info["profiling"]withtool_execute,evaluate,obs_postprocesstimings
Returns EnvironmentOutput(obs, reward, done, info, error). truncated is always
False — step/time-limit truncation is the harness's responsibility (TODO in code).
STOP_ACTION = ActionSchema(name="final_step", description="Stop the task execution.")Protocol for agent-initiated termination. Tasks that reject it must set
accept_agent_stop = False (e.g., tasks that require the agent to reach a terminal
state via environment interaction, not a self-declaration).
RuntimeContext = dict[str, Any]Free-form dict populated by Benchmark._setup() with shared infrastructure references
(server URLs, DB connections, handles to launched L2 resources). Passed to every Task
spawned from that benchmark.
Concurrency: after setup() returns, RuntimeContext is treated as read-only by
concurrent tasks. Writes are not safe across workers.
class TaskConfig[TTMetadata: TaskMetadata](TypedBaseModel, ABC):
metadata: SerializeAsAny[TTMetadata] # travels with the config
seed: int | None = None
tool_config: SerializeAsAny[ToolConfig] | None = None
sub_bench_name: str | None = None # composite routing hint (see below)
@property
def task_id(self) -> str:
"""Derived: ``"{sub_bench_name}/{metadata.id}"`` for composites, else ``metadata.id``."""
@abstractmethod
def make(
self,
runtime_context: RuntimeContext | None = None,
container_backend: ContainerBackend | None = None,
) -> Task
# ClassVar back-stamped by BenchmarkConfig.__init_subclass__ to
# cls.cache_dir() so the default task-execution cache lives directly
# under the benchmark's cache dir. None for TaskConfig subclasses
# constructed without an owning BenchmarkConfig.
_benchmark_cache_dir: ClassVar[Path | None] = None
@classmethod
def task_execution_cache_dir(cls) -> Path:
"""Default: ``BenchmarkConfig.cache_dir() / "tasks_execution_info"``,
falling back to ``~/.cube/<top-level-package-name>/tasks_execution_info/``
when ``_benchmark_cache_dir`` is not set."""
def load_task_execution_info(self) -> dict[str, Any]:
"""Read processed per-task data for ``self.task_id`` from the cache."""
def verify_installed(self) -> None:
"""Optional fail-fast check. Default: no-op."""Self-contained unit. Workers receive a TaskConfig and have everything
they need — metadata is embedded via the metadata field (stamped on each
emitted config by BenchmarkConfig.get_task_configs() on the driver).
make() uses self.metadata directly; no import of the owning
BenchmarkConfig is needed on the worker. This is the single most important
invariant of the layer: the serialization boundary is self-describing.
Subclasses that carry heavy install-time data (e.g. SWE-bench problem
statements, OSWorld evaluator configs) declare a TaskExecutionInfo
subclass for the heavy fields and populate Task.execution_info inside
TaskConfig.make() — typically by calling
self.load_task_execution_info() (read from the per-task on-disk cache)
and validating the resulting dict against the TaskExecutionInfo
subclass. The cache itself is written by BenchmarkConfig.install() —
operators run cube install <bench> once per worker environment.
Per-task cache helpers (worker-side).
task_execution_cache_dir()(classmethod) — defaultBenchmarkConfig.cache_dir() / "tasks_execution_info", where the cache dir is back-stamped byBenchmarkConfig.__init_subclass__onto the owningtask_config_classvia_benchmark_cache_dir. Falls back to~/.cube/<top-level-package-name>/tasks_execution_info/when no owner is attached — relevant for direct test instantiation. Override on subclasses that use a non-default cache layout (e.g. cubes that co-locate the cache with other on-disk state).BenchmarkConfig.install()writes viacls.task_config_class.task_execution_cache_dir()so the path has a single owner.load_task_execution_info()(instance method) — readstype(self).task_execution_cache_dir() / f"{self.task_id}.json". RaisesRuntimeErrorwith an actionable remediation message if the file is missing.verify_installed()(instance method) — optional fail-fast check that data this task relies on is locally available on this worker. Default: no-op. Cube authors override with a check appropriate to their cache. Convention:TaskConfig.make()callsself.verify_installed()at the top so misconfigured workers fail fast with an actionable error instead of timing out on a surprise download.
These helpers live on TaskConfig (worker-side) so workers do not need
to import the owning BenchmarkConfig to verify their environment or
resolve the cache path.
sub_bench_name is an optional routing tag. Standalone benchmarks leave
it None. CompositeBenchmarkConfig.get_task_configs() sets it to the
origin sub-benchmark's name. For nested composites (composites of
composites) each outer layer prepends its sub-benchmark name, producing a
"/"-joined path (e.g. "inner-suite/bench-a"). CompositeBenchmark.spawn()
peels the path one hop at a time, delegating to inner composites until the
leaf sub-benchmark is reached. No separate wrapper type is needed — the
emitted TaskConfig stays the sub-config's native subclass.
task_id is a derived @property (not a serialized field). For
standalone tasks it returns metadata.id; for composite tasks it returns
"{sub_bench_name}/{metadata.id}", which for nested composites produces a
fully-qualified path (e.g. "inner-suite/bench-a/task-1"). metadata.id
always retains the native un-prefixed id.
reset()must callself.tool.reset()(implementer responsibility).step()is concrete — do not override. All task-specific behavior goes inevaluate,filter_actions,obs_postprocess,finished.- Tool is built eagerly in
model_post_init— once a Task is constructed, its tool is live. accept_agent_stop=True(default) means the agent can self-terminate viaAction(name="final_step"). Evaluate is called on termination.info["profiling"]is always populated afterstep()unless no actions ran (empty list).
- Your
reset()must populate the environment to the state the agent will see for the first observation. Include the task objective as text in that observation. evaluate()is pure — no side effects on external systems. It may read tool state.- If you hold long-lived resources beyond the tool (containers, VMs, processes), override
close()and callsuper().close(). get_privileged_info()is for debug/oracle agents and tests. Ship it for any task that has a known solution — empowers the harness's debug mode.
- Polymorphic fields (
metadata,execution_info,tool_config) useSerializeAsAny[Base]rather than the bare base type. Without it Pydantic serializes only the base-class fields and silently drops subclass-specific state on every JSON round-trip. This is already in the base classes — cube authors don't need to repeat it on their subclasses. - Cubes that need narrower static types on
Task.metadatause the parametrised formclass FooTask(Task[FooTaskMetadata]):rather than re-annotatingmetadata: FooTaskMetadataon the subclass. Re-annotation is unsound under invariant-field semantics and type checkers reject it; the parametrised form expresses the intent correctly without an override. Pairs naturally withclass FooTaskConfig(TaskConfig[FooTaskMetadata]):. task_execution_cache_dir()lives directly underBenchmarkConfig.cache_dir()(back-stamped onto theTaskConfigsubclass at class-definition time byBenchmarkConfig.__init_subclass__via_benchmark_cache_dir), so cubes that overridecache_dir()(e.g. to co-locate with VM data) get the override applied to the per-task cache too without an extra override. The fallback to~/.cube/<top-level-package-name>/only kicks in forTaskConfigsubclasses that have no owningBenchmarkConfig(direct test instantiation).validate_per_step=Truemeansevaluate()runs every step — expensive. Default is only on termination.- STOP_ACTION is not automatically in the tool's action set — the harness / agent framework is responsible for including it in the action list shown to the LLM.
runtime_contextis a dict, not a Pydantic model — no type safety. Document keys in yourBenchmark._setup()docstring.model_post_initlaunches the container. If your ToolConfigmake()fails and you setcontainer_backend, the container is already running — may leak unless the caller handles construction errors.