Skip to content
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 129 additions & 16 deletions environments/r2e_gym_v1/r2e_gym_v1/taskset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
Each row ships a per-task Docker image with the repo checked out at ``/testbed`` and a
hidden ``/r2e_tests`` directory plus a ``run_tests.sh`` harness. ``setup`` symlinks the
repo venv onto ``PATH``, clears pycache, and stashes ``/r2e_tests`` out of the agent's
reach (tarred to ``/opt`` and removed) so the running agent can't read the ground-truth
tests. The ``solved`` reward restores the tests into ``/testbed/r2e_tests``, runs
reach (archived to the evaluator host and removed from the sandbox) so the running agent
can't read the ground-truth tests. The ``solved`` reward restores the tests, runs
``run_tests.sh``, parses the pytest summary, and scores 1.0 iff the per-test pass/fail
map exactly matches the row's ``expected_output_json``. A v1 port of the v0 ComposableEnv
``R2EGymTaskSet``.
"""

import json
import re
import tempfile
import weakref
from functools import cached_property
from pathlib import Path

import verifiers.v1 as vf

Expand All @@ -24,9 +28,7 @@

REPO_PATH = "/testbed"
ALT_PATH = "/root"
# Tests are staged here during the agent rollout (outside the /testbed workdir) and
# restored to /testbed/r2e_tests at scoring time.
STAGED_TESTS = "/opt/r2e_tests.tar.gz"
REMOTE_TESTS_ARCHIVE = "/tmp/r2e_tests.tar.gz"

# The testbed venv (project + pytest installed) plus quiet, non-interactive tooling —
# exported for every command the taskset runs in the sandbox.
Expand Down Expand Up @@ -60,12 +62,54 @@
"2>/dev/null || timeout 30 find . -name '*.pyc' -delete 2>/dev/null || true"
)

# Stash the ground-truth tests away from the agent (tar to /opt, then remove the dir).
HIDE_TESTS = f"tar -C / -czf {STAGED_TESTS} r2e_tests && rm -rf /r2e_tests"
# Restore them into the workdir for scoring (run_tests.sh expects /testbed/r2e_tests).
ARCHIVE_TESTS = f"tar -C / -czf {REMOTE_TESTS_ARCHIVE} r2e_tests"
REMOVE_TESTS = f"rm -rf /r2e_tests && rm -f {REMOTE_TESTS_ARCHIVE}"
MOVE_TESTS = f"rm -rf {REPO_PATH}/r2e_tests && mv /r2e_tests {REPO_PATH}/r2e_tests"
RESTORE_TESTS = (
f"rm -rf {REPO_PATH}/r2e_tests && tar -C {REPO_PATH} -xzf {STAGED_TESTS}"
f"rm -rf {REPO_PATH}/r2e_tests && "
f"tar -C {REPO_PATH} -xzf {REMOTE_TESTS_ARCHIVE} && "
f"rm -f {REMOTE_TESTS_ARCHIVE}"
)
SNAPSHOT_PROCESSES = r"""
for path in /proc/[0-9]*; do
pid=${path##*/}
IFS= read -r stat < "$path/stat" || continue
stat=${stat##*) }
set -- $stat
[ "$#" -ge 20 ] && printf '%s:%s ' "$pid" "${20}"
done
"""

# Scoring runs before runtime teardown, so reap detached agent processes before tests return.
KILL_AGENT_PROCESSES = r"""
self=$$
ancestors=" 1 "
baseline=" $R2E_BASELINE_PROCESSES "
pid=$self
while [ "$pid" -gt 1 ] 2>/dev/null; do
ancestors="$ancestors$pid "
pid=$(sed -n 's/^PPid:[[:space:]]*//p' "/proc/$pid/status" 2>/dev/null)
[ -n "$pid" ] || break
done
for path in /proc/[0-9]*; do
Comment thread
xeophon marked this conversation as resolved.
Outdated
pid=${path##*/}
case "$ancestors" in
*" $pid "*) continue ;;
esac
identity=
if IFS= read -r stat < "$path/stat"; then
stat=${stat##*) }
set -- $stat
[ "$#" -lt 20 ] || identity="$pid:${20}"
fi
if [ -n "$identity" ]; then
case "$baseline" in
*" $identity "*) continue ;;
esac
fi
kill -KILL "$pid" 2>/dev/null || true
done
"""


def parse_log_pytest(log: str | None) -> dict[str, str]:
Expand Down Expand Up @@ -177,11 +221,22 @@ class R2EGymConfig(vf.TasksetConfig):
use_prime_registry: bool = False
"""Resolve task images against Prime's private Artifact Registry (`REGISTRY`) instead of the
dataset's public Docker Hub `docker_image`. Only works on runtimes with GCP pull credentials."""
hide_tests_from_agent: bool = True
"""Keep tests on the evaluator host during agent rollouts. Disable only for no-agent
validation, where an in-sandbox move avoids the archive transfer."""


class R2EGymTaskset(vf.Taskset[R2EGymTask, R2EGymConfig]):
NEEDS_CONTAINER = True

@cached_property
def _host_archives(self) -> weakref.WeakKeyDictionary[vf.Runtime, Path]:
return weakref.WeakKeyDictionary()

@cached_property
def _baseline_processes(self) -> weakref.WeakKeyDictionary[vf.Runtime, str]:
return weakref.WeakKeyDictionary()

def load_tasks(self) -> list[R2EGymTask]:
from datasets import load_dataset

Expand All @@ -205,18 +260,76 @@ def load_tasks(self) -> list[R2EGymTask]:
]

async def setup(self, task: R2EGymTask, runtime: vf.Runtime) -> None:
for cmd in (LINK, CLEAN_PYCACHE, HIDE_TESTS):
result = await runtime.run(["sh", "-c", cmd], ENV)
if cmd is HIDE_TESTS and result.exit_code != 0:
for cmd in (LINK, CLEAN_PYCACHE):
await runtime.run(["sh", "-c", cmd], ENV)

if not self.config.hide_tests_from_agent:
result = await runtime.run(["sh", "-c", MOVE_TESTS], ENV)
if result.exit_code != 0:
raise RuntimeError(
f"r2e setup failed to stage tests ({task.name}): {result.stderr.strip()[-500:]}"
f"r2e setup failed to move tests ({task.name}): {result.stderr.strip()[-500:]}"
)
return

result = await runtime.run(["sh", "-c", ARCHIVE_TESTS], ENV)
if result.exit_code != 0:
raise RuntimeError(
f"r2e setup failed to archive tests ({task.name}): {result.stderr.strip()[-500:]}"
)

archive = await runtime.read(REMOTE_TESTS_ARCHIVE)
with tempfile.NamedTemporaryFile(
prefix=f"r2e_tests_{runtime.name}_", suffix=".tar.gz", delete=False
) as file:
file.write(archive)
host_archive = Path(file.name)
self._host_archives[runtime] = host_archive
weakref.finalize(runtime, host_archive.unlink, missing_ok=True)
Comment thread
xeophon marked this conversation as resolved.

result = await runtime.run(["sh", "-c", REMOVE_TESTS], ENV)
if result.exit_code != 0:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
raise RuntimeError(
f"r2e setup failed to hide tests ({task.name}): {result.stderr.strip()[-500:]}"
)

baseline = await runtime.run(["sh", "-c", SNAPSHOT_PROCESSES], ENV)
if baseline.exit_code != 0:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
raise RuntimeError(
f"r2e setup failed to snapshot processes ({task.name}): "
f"{baseline.stderr.strip()[-500:]}"
)
self._baseline_processes[runtime] = baseline.stdout

@vf.reward(weight=1.0)
async def solved(self, task: R2EGymTask, runtime: vf.Runtime) -> float:
restore = await runtime.run(["sh", "-c", RESTORE_TESTS], ENV)
if restore.exit_code != 0:
return 0.0
if self.config.hide_tests_from_agent:
host_archive = self._host_archives.get(runtime)
baseline_processes = self._baseline_processes.get(runtime)
if (
host_archive is None
or not host_archive.exists()
or baseline_processes is None
):
return 0.0
try:
cleanup = await runtime.run(
["sh", "-c", KILL_AGENT_PROCESSES],
ENV | {"R2E_BASELINE_PROCESSES": baseline_processes},
)
if cleanup.exit_code != 0:
return 0.0
await runtime.write(REMOTE_TESTS_ARCHIVE, host_archive.read_bytes())
Comment thread
xeophon marked this conversation as resolved.
Outdated
restore = await runtime.run(["sh", "-c", RESTORE_TESTS], ENV)
Comment thread
xeophon marked this conversation as resolved.
Outdated
Comment thread
xeophon marked this conversation as resolved.
Outdated
finally:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
self._baseline_processes.pop(runtime, None)
if restore.exit_code != 0:
return 0.0
result = await runtime.run(["sh", "-c", "/bin/bash run_tests.sh 2>&1"], ENV)
return calculate_reward(result.stdout or "", task.expected_output_json)

Expand Down
Loading