Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 214 additions & 17 deletions environments/r2e_gym_v1/r2e_gym_v1/taskset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@
Each row ships a per-task Docker image with the repo checked out at ``/testbed`` and a
hidden ``/r2e_tests`` directory plus a ``run_tests.sh`` harness. ``setup`` symlinks the
repo venv onto ``PATH``, clears pycache, and stashes ``/r2e_tests`` out of the agent's
reach (tarred to ``/opt`` and removed) so the running agent can't read the ground-truth
tests. The ``solved`` reward restores the tests into ``/testbed/r2e_tests``, runs
reach (archived to the evaluator host and removed from the sandbox) so the running agent
can't read the ground-truth tests. The ``solved`` reward restores the tests, runs
``run_tests.sh``, parses the pytest summary, and scores 1.0 iff the per-test pass/fail
map exactly matches the row's ``expected_output_json``. A v1 port of the v0 ComposableEnv
``R2EGymTaskSet``.
"""

import hashlib
import hmac
import json
import re
import secrets
import tempfile
import weakref
from functools import cached_property
from pathlib import Path

import verifiers.v1 as vf

Expand All @@ -24,9 +31,8 @@

REPO_PATH = "/testbed"
ALT_PATH = "/root"
# Tests are staged here during the agent rollout (outside the /testbed workdir) and
# restored to /testbed/r2e_tests at scoring time.
STAGED_TESTS = "/opt/r2e_tests.tar.gz"
REMOTE_TESTS_ARCHIVE = "/tmp/r2e_tests.tar.gz"
REMOTE_TESTS_CIPHERTEXT = f"{REMOTE_TESTS_ARCHIVE}.sealed"

# The testbed venv (project + pytest installed) plus quiet, non-interactive tooling —
# exported for every command the taskset runs in the sandbox.
Expand All @@ -44,6 +50,9 @@
"CI": "1",
}

# Hidden-test control commands must not resolve tools from agent-writable project paths.
SYSTEM_ENV = {"PATH": "/usr/sbin:/usr/bin:/sbin:/bin"}

# Symlink the repo venv + its executables onto the alt PATH so plain `python`/`pytest`
# resolve to the project environment regardless of the agent's cwd.
LINK = r"""
Expand All @@ -60,12 +69,93 @@
"2>/dev/null || timeout 30 find . -name '*.pyc' -delete 2>/dev/null || true"
)

# Stash the ground-truth tests away from the agent (tar to /opt, then remove the dir).
HIDE_TESTS = f"tar -C / -czf {STAGED_TESTS} r2e_tests && rm -rf /r2e_tests"
# Restore them into the workdir for scoring (run_tests.sh expects /testbed/r2e_tests).
ARCHIVE_TESTS = f"tar -C / -czf {REMOTE_TESTS_ARCHIVE} r2e_tests"
REMOVE_TESTS = (
f"rm -rf /r2e_tests && rm -f {REMOTE_TESTS_ARCHIVE} {REMOTE_TESTS_CIPHERTEXT}"
)
MOVE_TESTS = f"rm -rf {REPO_PATH}/r2e_tests && mv /r2e_tests {REPO_PATH}/r2e_tests"
RESTORE_TESTS = (
f"rm -rf {REPO_PATH}/r2e_tests && tar -C {REPO_PATH} -xzf {STAGED_TESTS}"
f"rm -rf {REPO_PATH}/r2e_tests && "
f"tar -C {REPO_PATH} -xzf {REMOTE_TESTS_ARCHIVE} && "
f"rm -f {REMOTE_TESTS_ARCHIVE} {REMOTE_TESTS_CIPHERTEXT}"
)
DECRYPT_TESTS = f"""
import hashlib
import hmac
import os

sealed_path = {REMOTE_TESTS_CIPHERTEXT!r}
archive_path = {REMOTE_TESTS_ARCHIVE!r}
key = bytes.fromhex(os.environ["R2E_ARCHIVE_KEY"])
with open(sealed_path, "rb") as file:
sealed = file.read()
tag, ciphertext = sealed[:32], sealed[32:]
mac_key = hashlib.sha256(b"r2e-tests-mac\\0" + key).digest()
expected = hmac.digest(mac_key, ciphertext, "sha256")
if len(tag) != 32 or not hmac.compare_digest(tag, expected):
raise SystemExit("invalid R2E test archive")
stream = hashlib.shake_256(b"r2e-tests-enc\\0" + key).digest(len(ciphertext))
plaintext = bytes(a ^ b for a, b in zip(ciphertext, stream))
try:
os.unlink(archive_path)
except FileNotFoundError:
pass
with open(archive_path, "xb") as file:
file.write(plaintext)
os.chmod(archive_path, 0o600)
os.unlink(sealed_path)
"""
SNAPSHOT_PROCESSES = r"""
for path in /proc/[0-9]*; do
pid=${path##*/}
IFS= read -r stat < "$path/stat" || continue
stat=${stat##*) }
set -- $stat
[ "$#" -ge 20 ] && printf '%s:%s ' "$pid" "${20}"
done
"""

# Scoring runs before runtime teardown, so reap detached agent processes before tests return.
KILL_AGENT_PROCESSES = r"""
self=$$
ancestors=" 1 "
baseline=" $R2E_BASELINE_PROCESSES "
pid=$self
while [ "$pid" -gt 1 ] 2>/dev/null; do
ancestors="$ancestors$pid "
pid=$(sed -n 's/^PPid:[[:space:]]*//p' "/proc/$pid/status" 2>/dev/null)
[ -n "$pid" ] || break
done
while :; do
found=0
for path in /proc/[0-9]*; do
pid=${path##*/}
case "$ancestors" in
*" $pid "*) continue ;;
esac
identity=
state=
if IFS= read -r stat < "$path/stat"; then
stat=${stat##*) }
set -- $stat
if [ "$#" -ge 20 ]; then
state=$1
identity="$pid:${20}"
fi
fi
[ "$state" = Z ] && continue
if [ -n "$identity" ]; then
case "$baseline" in
*" $identity "*) continue ;;
esac
fi
found=1
kill -STOP "$pid" 2>/dev/null || true
kill -KILL "$pid" 2>/dev/null || true
done
[ "$found" -eq 0 ] && break
done
"""


def parse_log_pytest(log: str | None) -> dict[str, str]:
Expand Down Expand Up @@ -177,11 +267,22 @@ class R2EGymConfig(vf.TasksetConfig):
use_prime_registry: bool = False
"""Resolve task images against Prime's private Artifact Registry (`REGISTRY`) instead of the
dataset's public Docker Hub `docker_image`. Only works on runtimes with GCP pull credentials."""
hide_tests_from_agent: bool = True
"""Keep tests on the evaluator host during agent rollouts. Disable only for no-agent
validation, where an in-sandbox move avoids the archive transfer."""


class R2EGymTaskset(vf.Taskset[R2EGymTask, R2EGymConfig]):
NEEDS_CONTAINER = True

@cached_property
def _host_archives(self) -> weakref.WeakKeyDictionary[vf.Runtime, Path]:
return weakref.WeakKeyDictionary()

@cached_property
def _baseline_processes(self) -> weakref.WeakKeyDictionary[vf.Runtime, str]:
return weakref.WeakKeyDictionary()

def load_tasks(self) -> list[R2EGymTask]:
from datasets import load_dataset

Expand All @@ -205,19 +306,115 @@ def load_tasks(self) -> list[R2EGymTask]:
]

async def setup(self, task: R2EGymTask, runtime: vf.Runtime) -> None:
for cmd in (LINK, CLEAN_PYCACHE, HIDE_TESTS):
result = await runtime.run(["sh", "-c", cmd], ENV)
if cmd is HIDE_TESTS and result.exit_code != 0:
for cmd in (LINK, CLEAN_PYCACHE):
await runtime.run(["sh", "-c", cmd], ENV)

if not self.config.hide_tests_from_agent:
result = await runtime.run(["sh", "-c", MOVE_TESTS], ENV)
if result.exit_code != 0:
raise RuntimeError(
f"r2e setup failed to stage tests ({task.name}): {result.stderr.strip()[-500:]}"
f"r2e setup failed to move tests ({task.name}): {result.stderr.strip()[-500:]}"
)
return

result = await runtime.run(["sh", "-c", ARCHIVE_TESTS], ENV)
if result.exit_code != 0:
raise RuntimeError(
f"r2e setup failed to archive tests ({task.name}): {result.stderr.strip()[-500:]}"
)

archive = await runtime.read(REMOTE_TESTS_ARCHIVE)
with tempfile.NamedTemporaryFile(
prefix=f"r2e_tests_{runtime.name}_", suffix=".tar.gz", delete=False
) as file:
file.write(archive)
host_archive = Path(file.name)
self._host_archives[runtime] = host_archive
weakref.finalize(runtime, host_archive.unlink, missing_ok=True)
Comment thread
xeophon marked this conversation as resolved.
original_stop = runtime.stop

async def stop_with_archive_cleanup() -> None:
try:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
self._baseline_processes.pop(runtime, None)
finally:
await original_stop()

setattr(runtime, "stop", stop_with_archive_cleanup)

result = await runtime.run(["sh", "-c", REMOVE_TESTS], ENV)
if result.exit_code != 0:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
raise RuntimeError(
f"r2e setup failed to hide tests ({task.name}): {result.stderr.strip()[-500:]}"
)

baseline = await runtime.run(["sh", "-c", SNAPSHOT_PROCESSES], ENV)
if baseline.exit_code != 0:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
raise RuntimeError(
f"r2e setup failed to snapshot processes ({task.name}): "
f"{baseline.stderr.strip()[-500:]}"
)
self._baseline_processes[runtime] = baseline.stdout

@vf.reward(weight=1.0)
async def solved(self, task: R2EGymTask, runtime: vf.Runtime) -> float:
restore = await runtime.run(["sh", "-c", RESTORE_TESTS], ENV)
if restore.exit_code != 0:
return 0.0
result = await runtime.run(["sh", "-c", "/bin/bash run_tests.sh 2>&1"], ENV)
if self.config.hide_tests_from_agent:
host_archive = self._host_archives.get(runtime)
baseline_processes = self._baseline_processes.get(runtime)
if (
host_archive is None
or not host_archive.exists()
or baseline_processes is None
):
return 0.0
try:
cleanup_env = SYSTEM_ENV | {
"R2E_BASELINE_PROCESSES": baseline_processes
}
cleanup = await runtime.run(
["/bin/sh", "-c", KILL_AGENT_PROCESSES],
cleanup_env,
)
if cleanup.exit_code != 0:
return 0.0
archive = host_archive.read_bytes()
key = secrets.token_bytes(32)
# write() may invoke sandbox helpers: expose only ciphertext, then reap again.
stream = hashlib.shake_256(b"r2e-tests-enc\0" + key).digest(
len(archive)
)
ciphertext = bytes(a ^ b for a, b in zip(archive, stream))
mac_key = hashlib.sha256(b"r2e-tests-mac\0" + key).digest()
sealed = hmac.digest(mac_key, ciphertext, "sha256") + ciphertext
await runtime.write(REMOTE_TESTS_CIPHERTEXT, sealed)
cleanup = await runtime.run(
["/bin/sh", "-c", KILL_AGENT_PROCESSES], cleanup_env
)
if cleanup.exit_code != 0:
return 0.0
decrypt = await runtime.run(
["/usr/bin/python3", "-I", "-S", "-c", DECRYPT_TESTS],
SYSTEM_ENV | {"R2E_ARCHIVE_KEY": key.hex()},
Comment on lines +400 to +402

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not hand the archive key to mutable sandbox Python

In root-capable R2E sandboxes, the agent can replace /usr/bin/python3 or its stdlib/shared libraries during generation; the process sweep only kills live PIDs and does not restore filesystem changes. This line then executes that agent-mutated interpreter with R2E_ARCHIVE_KEY, so a wrapper can copy the key, decrypt /tmp/r2e_tests.tar.gz.sealed, or patch /testbed before delegating, leaking the hidden tests despite the ciphertext upload. Please keep decryption/extraction in a clean trusted runtime or otherwise run the key through immutable verified helpers.

Useful? React with 👍 / 👎.

)
if decrypt.exit_code != 0:
return 0.0
restore = await runtime.run(
["/bin/sh", "-c", RESTORE_TESTS], SYSTEM_ENV
)
finally:
host_archive.unlink(missing_ok=True)
self._host_archives.pop(runtime, None)
self._baseline_processes.pop(runtime, None)
if restore.exit_code != 0:
return 0.0
result = await runtime.run(
["/bin/sh", "-c", "/bin/bash run_tests.sh 2>&1"], ENV
)
return calculate_reward(result.stdout or "", task.expected_output_json)

async def validate(self, task: R2EGymTask, runtime: vf.Runtime) -> bool:
Expand Down
Loading