From 317c0432ec25affd8ce3c9afe900d930d5ea494c Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Sun, 21 Jun 2026 15:59:35 +0200 Subject: [PATCH 1/6] Honor rollout concurrency for grouped evals --- verifiers/v1/cli/eval/runner.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/verifiers/v1/cli/eval/runner.py b/verifiers/v1/cli/eval/runner.py index 272c492d4..c1ea30129 100644 --- a/verifiers/v1/cli/eval/runner.py +++ b/verifiers/v1/cli/eval/runner.py @@ -180,8 +180,13 @@ async def run_eval_server(config: EvalConfig) -> list[Trace]: config.model, ) logger.info("results: %s", out) + request_concurrency = config.max_concurrent + if request_concurrency and info.requires_group_scoring: + # max_concurrent is a rollout resource bound, not a request-throughput target. + # A group is indivisible, so one oversized group must still be allowed to run. + request_concurrency = max(1, request_concurrency // config.num_rollouts) semaphore = ( - asyncio.Semaphore(config.max_concurrent) if config.max_concurrent else None + asyncio.Semaphore(request_concurrency) if request_concurrency else None ) async def run_group_unit(idx: int) -> list[Trace]: From a606aece47699dbfd3b91c0046f1dd5504ca297b Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:07:07 +0200 Subject: [PATCH 2/6] Weight grouped requests in elastic pool --- verifiers/v1/serve/pool.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/verifiers/v1/serve/pool.py b/verifiers/v1/serve/pool.py index 678488e47..997fa8958 100644 --- a/verifiers/v1/serve/pool.py +++ b/verifiers/v1/serve/pool.py @@ -161,7 +161,7 @@ def _spawn_worker(self) -> None: self._poller.register(dealer, zmq.POLLIN) def _maybe_scale_up(self, in_flight: int) -> None: - """Spawn one more worker when in-flight requests reach 90% of current capacity. + """Spawn one more worker when weighted in-flight load reaches 90% of capacity. A new worker starts at `active=0`, so least-busy dispatch funnels the backlog to it as it comes online (a few seconds to load the env) — fine, since we only scale @@ -198,6 +198,7 @@ async def run(self) -> None: self.elastic, ) try: + in_flight = 0 while True: events = dict(await self._poller.poll()) if self.frontend in events: @@ -212,16 +213,27 @@ async def run(self) -> None: [client_id, request_id, _HEALTH] ) else: + # Pool capacity is measured in rollouts; one group request carries n. + weight = ( + msgpack.unpackb(payload, raw=False)["n"] + if method == b"run_group" + else 1 + ) worker = min(self.workers, key=lambda w: w["active"]) - worker["active"] += 1 - pending[request_id] = {"client_id": client_id, "worker": worker} + worker["active"] += weight + pending[request_id] = { + "client_id": client_id, + "worker": worker, + "weight": weight, + } + in_flight += weight # forward without client_id — the DEALER identity is the worker's # `client_id`; we hold the real one in `pending`. await worker["dealer"].send_multipart( [request_id, method, payload] ) if self.elastic: - self._maybe_scale_up(len(pending)) + self._maybe_scale_up(in_flight) for w in self.workers: if w["dealer"] in events: request_id, data = await w["dealer"].recv_multipart(copy=False) @@ -229,7 +241,8 @@ async def run(self) -> None: entry = pending.pop(request_id.bytes, None) if entry is None: continue - entry["worker"]["active"] -= 1 + entry["worker"]["active"] -= entry["weight"] + in_flight -= entry["weight"] with contextlib.suppress(zmq.ZMQError): await self.frontend.send_multipart( [entry["client_id"], request_id, data], copy=False From f2245272d7edaeab31d002fb9bcd62ef6db3ca4d Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:22:48 +0200 Subject: [PATCH 3/6] Handle malformed grouped pool requests --- tests/v1/test_pool.py | 45 ++++++++++++++++++++++++++++++++++++++ verifiers/v1/serve/pool.py | 10 ++++----- 2 files changed, 50 insertions(+), 5 deletions(-) create mode 100644 tests/v1/test_pool.py diff --git a/tests/v1/test_pool.py b/tests/v1/test_pool.py new file mode 100644 index 000000000..d1dbecebd --- /dev/null +++ b/tests/v1/test_pool.py @@ -0,0 +1,45 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import msgpack +import pytest +import zmq + +from verifiers.v1.serve.pool import EnvServerPool + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "payload", + [b"\xc1", msgpack.packb({}), msgpack.packb({"n": "eight"})], +) +async def test_malformed_group_payload_falls_back_to_one(payload: bytes) -> None: + pool = object.__new__(EnvServerPool) + pool.elastic = True + pool.max_workers = 2 + pool.multiplex = 128 + pool.address = "test://pool" + pool.workers = [] + pool._shutdown = MagicMock() + + frontend = MagicMock() + frontend.recv_multipart = AsyncMock( + return_value=[b"client", b"request", b"run_group", payload] + ) + frontend.send_multipart = AsyncMock() + pool.frontend = frontend + + dealer = MagicMock() + dealer.send_multipart = AsyncMock() + worker = {"dealer": dealer, "active": 0} + pool._spawn_worker = MagicMock(side_effect=lambda: pool.workers.append(worker)) + + poller = MagicMock() + poller.poll = AsyncMock( + side_effect=[{frontend: zmq.POLLIN}, asyncio.CancelledError()] + ) + with patch("verifiers.v1.serve.pool.zmq.asyncio.Poller", return_value=poller): + await pool.run() + + assert worker["active"] == 1 + dealer.send_multipart.assert_awaited_once_with([b"request", b"run_group", payload]) diff --git a/verifiers/v1/serve/pool.py b/verifiers/v1/serve/pool.py index 997fa8958..952efddc2 100644 --- a/verifiers/v1/serve/pool.py +++ b/verifiers/v1/serve/pool.py @@ -214,11 +214,11 @@ async def run(self) -> None: ) else: # Pool capacity is measured in rollouts; one group request carries n. - weight = ( - msgpack.unpackb(payload, raw=False)["n"] - if method == b"run_group" - else 1 - ) + weight = 1 + if method == b"run_group": + with contextlib.suppress(Exception): + n = msgpack.unpackb(payload, raw=False)["n"] + weight = n if type(n) is int and n > 0 else 1 worker = min(self.workers, key=lambda w: w["active"]) worker["active"] += weight pending[request_id] = { From ea07ec70e6d0b53b12b72677a03ba333a8ad6b95 Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:45:14 +0200 Subject: [PATCH 4/6] Name pool load as rollout slots --- tests/v1/test_pool.py | 2 +- verifiers/v1/serve/pool.py | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/v1/test_pool.py b/tests/v1/test_pool.py index d1dbecebd..ce90d69bd 100644 --- a/tests/v1/test_pool.py +++ b/tests/v1/test_pool.py @@ -13,7 +13,7 @@ "payload", [b"\xc1", msgpack.packb({}), msgpack.packb({"n": "eight"})], ) -async def test_malformed_group_payload_falls_back_to_one(payload: bytes) -> None: +async def test_malformed_group_payload_uses_one_rollout_slot(payload: bytes) -> None: pool = object.__new__(EnvServerPool) pool.elastic = True pool.max_workers = 2 diff --git a/verifiers/v1/serve/pool.py b/verifiers/v1/serve/pool.py index 952efddc2..52fcb14dc 100644 --- a/verifiers/v1/serve/pool.py +++ b/verifiers/v1/serve/pool.py @@ -161,7 +161,7 @@ def _spawn_worker(self) -> None: self._poller.register(dealer, zmq.POLLIN) def _maybe_scale_up(self, in_flight: int) -> None: - """Spawn one more worker when weighted in-flight load reaches 90% of capacity. + """Spawn one more worker when in-flight rollout slots reach 90% of capacity. A new worker starts at `active=0`, so least-busy dispatch funnels the backlog to it as it comes online (a few seconds to load the env) — fine, since we only scale @@ -188,7 +188,8 @@ async def run(self) -> None: # (`max_workers` is a concrete count when elastic is off). for _ in range(1 if self.elastic else (self.max_workers or 1)): self._spawn_worker() - pending: dict[bytes, dict] = {} # request_id -> {client_id, worker} + # request_id -> {client_id, worker, rollout_slots} + pending: dict[bytes, dict] = {} logger.info( "EnvServerPool up: address=%s workers=%d/%s multiplex=%d elastic=%s", self.address, @@ -214,19 +215,19 @@ async def run(self) -> None: ) else: # Pool capacity is measured in rollouts; one group request carries n. - weight = 1 + rollout_slots = 1 if method == b"run_group": with contextlib.suppress(Exception): n = msgpack.unpackb(payload, raw=False)["n"] - weight = n if type(n) is int and n > 0 else 1 + rollout_slots = n if type(n) is int and n > 0 else 1 worker = min(self.workers, key=lambda w: w["active"]) - worker["active"] += weight + worker["active"] += rollout_slots pending[request_id] = { "client_id": client_id, "worker": worker, - "weight": weight, + "rollout_slots": rollout_slots, } - in_flight += weight + in_flight += rollout_slots # forward without client_id — the DEALER identity is the worker's # `client_id`; we hold the real one in `pending`. await worker["dealer"].send_multipart( @@ -241,8 +242,8 @@ async def run(self) -> None: entry = pending.pop(request_id.bytes, None) if entry is None: continue - entry["worker"]["active"] -= entry["weight"] - in_flight -= entry["weight"] + entry["worker"]["active"] -= entry["rollout_slots"] + in_flight -= entry["rollout_slots"] with contextlib.suppress(zmq.ZMQError): await self.frontend.send_multipart( [entry["client_id"], request_id, data], copy=False From a07ad7d802ebb4fb93b440dd185c7c553dd3866e Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:48:00 +0200 Subject: [PATCH 5/6] Remove pool regression test --- tests/v1/test_pool.py | 45 ------------------------------------------- 1 file changed, 45 deletions(-) delete mode 100644 tests/v1/test_pool.py diff --git a/tests/v1/test_pool.py b/tests/v1/test_pool.py deleted file mode 100644 index ce90d69bd..000000000 --- a/tests/v1/test_pool.py +++ /dev/null @@ -1,45 +0,0 @@ -import asyncio -from unittest.mock import AsyncMock, MagicMock, patch - -import msgpack -import pytest -import zmq - -from verifiers.v1.serve.pool import EnvServerPool - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "payload", - [b"\xc1", msgpack.packb({}), msgpack.packb({"n": "eight"})], -) -async def test_malformed_group_payload_uses_one_rollout_slot(payload: bytes) -> None: - pool = object.__new__(EnvServerPool) - pool.elastic = True - pool.max_workers = 2 - pool.multiplex = 128 - pool.address = "test://pool" - pool.workers = [] - pool._shutdown = MagicMock() - - frontend = MagicMock() - frontend.recv_multipart = AsyncMock( - return_value=[b"client", b"request", b"run_group", payload] - ) - frontend.send_multipart = AsyncMock() - pool.frontend = frontend - - dealer = MagicMock() - dealer.send_multipart = AsyncMock() - worker = {"dealer": dealer, "active": 0} - pool._spawn_worker = MagicMock(side_effect=lambda: pool.workers.append(worker)) - - poller = MagicMock() - poller.poll = AsyncMock( - side_effect=[{frontend: zmq.POLLIN}, asyncio.CancelledError()] - ) - with patch("verifiers.v1.serve.pool.zmq.asyncio.Poller", return_value=poller): - await pool.run() - - assert worker["active"] == 1 - dealer.send_multipart.assert_awaited_once_with([b"request", b"run_group", payload]) From 058058587404a1844c747627d04647e4b6267394 Mon Sep 17 00:00:00 2001 From: Xeophon <46377542+xeophon@users.noreply.github.com> Date: Sun, 21 Jun 2026 16:50:22 +0200 Subject: [PATCH 6/6] Validate group load before scaling pool --- verifiers/v1/serve/pool.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/verifiers/v1/serve/pool.py b/verifiers/v1/serve/pool.py index 52fcb14dc..6b0b7bd91 100644 --- a/verifiers/v1/serve/pool.py +++ b/verifiers/v1/serve/pool.py @@ -37,7 +37,7 @@ from verifiers.v1.env import EnvConfig from verifiers.v1.serve.server import EnvServer -from verifiers.v1.serve.types import HealthResponse +from verifiers.v1.serve.types import HealthResponse, RunGroupRequest logger = logging.getLogger(__name__) @@ -218,8 +218,10 @@ async def run(self) -> None: rollout_slots = 1 if method == b"run_group": with contextlib.suppress(Exception): - n = msgpack.unpackb(payload, raw=False)["n"] - rollout_slots = n if type(n) is int and n > 0 else 1 + request = RunGroupRequest.model_validate( + msgpack.unpackb(payload, raw=False) + ) + rollout_slots = max(1, request.n) worker = min(self.workers, key=lambda w: w["active"]) worker["active"] += rollout_slots pending[request_id] = {