diff --git a/verifiers/v1/cli/eval/runner.py b/verifiers/v1/cli/eval/runner.py index 272c492d4..c1ea30129 100644 --- a/verifiers/v1/cli/eval/runner.py +++ b/verifiers/v1/cli/eval/runner.py @@ -180,8 +180,13 @@ async def run_eval_server(config: EvalConfig) -> list[Trace]: config.model, ) logger.info("results: %s", out) + request_concurrency = config.max_concurrent + if request_concurrency and info.requires_group_scoring: + # max_concurrent is a rollout resource bound, not a request-throughput target. + # A group is indivisible, so one oversized group must still be allowed to run. + request_concurrency = max(1, request_concurrency // config.num_rollouts) semaphore = ( - asyncio.Semaphore(config.max_concurrent) if config.max_concurrent else None + asyncio.Semaphore(request_concurrency) if request_concurrency else None ) async def run_group_unit(idx: int) -> list[Trace]: diff --git a/verifiers/v1/serve/pool.py b/verifiers/v1/serve/pool.py index 678488e47..6b0b7bd91 100644 --- a/verifiers/v1/serve/pool.py +++ b/verifiers/v1/serve/pool.py @@ -37,7 +37,7 @@ from verifiers.v1.env import EnvConfig from verifiers.v1.serve.server import EnvServer -from verifiers.v1.serve.types import HealthResponse +from verifiers.v1.serve.types import HealthResponse, RunGroupRequest logger = logging.getLogger(__name__) @@ -161,7 +161,7 @@ def _spawn_worker(self) -> None: self._poller.register(dealer, zmq.POLLIN) def _maybe_scale_up(self, in_flight: int) -> None: - """Spawn one more worker when in-flight requests reach 90% of current capacity. + """Spawn one more worker when in-flight rollout slots reach 90% of capacity. A new worker starts at `active=0`, so least-busy dispatch funnels the backlog to it as it comes online (a few seconds to load the env) — fine, since we only scale @@ -188,7 +188,8 @@ async def run(self) -> None: # (`max_workers` is a concrete count when elastic is off). for _ in range(1 if self.elastic else (self.max_workers or 1)): self._spawn_worker() - pending: dict[bytes, dict] = {} # request_id -> {client_id, worker} + # request_id -> {client_id, worker, rollout_slots} + pending: dict[bytes, dict] = {} logger.info( "EnvServerPool up: address=%s workers=%d/%s multiplex=%d elastic=%s", self.address, @@ -198,6 +199,7 @@ async def run(self) -> None: self.elastic, ) try: + in_flight = 0 while True: events = dict(await self._poller.poll()) if self.frontend in events: @@ -212,16 +214,29 @@ async def run(self) -> None: [client_id, request_id, _HEALTH] ) else: + # Pool capacity is measured in rollouts; one group request carries n. + rollout_slots = 1 + if method == b"run_group": + with contextlib.suppress(Exception): + request = RunGroupRequest.model_validate( + msgpack.unpackb(payload, raw=False) + ) + rollout_slots = max(1, request.n) worker = min(self.workers, key=lambda w: w["active"]) - worker["active"] += 1 - pending[request_id] = {"client_id": client_id, "worker": worker} + worker["active"] += rollout_slots + pending[request_id] = { + "client_id": client_id, + "worker": worker, + "rollout_slots": rollout_slots, + } + in_flight += rollout_slots # forward without client_id — the DEALER identity is the worker's # `client_id`; we hold the real one in `pending`. await worker["dealer"].send_multipart( [request_id, method, payload] ) if self.elastic: - self._maybe_scale_up(len(pending)) + self._maybe_scale_up(in_flight) for w in self.workers: if w["dealer"] in events: request_id, data = await w["dealer"].recv_multipart(copy=False) @@ -229,7 +244,8 @@ async def run(self) -> None: entry = pending.pop(request_id.bytes, None) if entry is None: continue - entry["worker"]["active"] -= 1 + entry["worker"]["active"] -= entry["rollout_slots"] + in_flight -= entry["rollout_slots"] with contextlib.suppress(zmq.ZMQError): await self.frontend.send_multipart( [entry["client_id"], request_id, data], copy=False