Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion verifiers/v1/cli/eval/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ async def run_eval_server(config: EvalConfig) -> list[Trace]:
config.model,
)
logger.info("results: %s", out)
request_concurrency = config.max_concurrent
if request_concurrency and info.requires_group_scoring:
# max_concurrent is a rollout resource bound, not a request-throughput target.
# A group is indivisible, so one oversized group must still be allowed to run.
request_concurrency = max(1, request_concurrency // config.num_rollouts)
Comment thread
xeophon marked this conversation as resolved.
semaphore = (
asyncio.Semaphore(config.max_concurrent) if config.max_concurrent else None
asyncio.Semaphore(request_concurrency) if request_concurrency else None
)

async def run_group_unit(idx: int) -> list[Trace]:
Expand Down
30 changes: 23 additions & 7 deletions verifiers/v1/serve/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

from verifiers.v1.env import EnvConfig
from verifiers.v1.serve.server import EnvServer
from verifiers.v1.serve.types import HealthResponse
from verifiers.v1.serve.types import HealthResponse, RunGroupRequest

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,7 +161,7 @@ def _spawn_worker(self) -> None:
self._poller.register(dealer, zmq.POLLIN)

def _maybe_scale_up(self, in_flight: int) -> None:
"""Spawn one more worker when in-flight requests reach 90% of current capacity.
"""Spawn one more worker when in-flight rollout slots reach 90% of capacity.

A new worker starts at `active=0`, so least-busy dispatch funnels the backlog to
it as it comes online (a few seconds to load the env) — fine, since we only scale
Expand All @@ -188,7 +188,8 @@ async def run(self) -> None:
# (`max_workers` is a concrete count when elastic is off).
for _ in range(1 if self.elastic else (self.max_workers or 1)):
self._spawn_worker()
pending: dict[bytes, dict] = {} # request_id -> {client_id, worker}
# request_id -> {client_id, worker, rollout_slots}
pending: dict[bytes, dict] = {}
logger.info(
"EnvServerPool up: address=%s workers=%d/%s multiplex=%d elastic=%s",
self.address,
Expand All @@ -198,6 +199,7 @@ async def run(self) -> None:
self.elastic,
)
try:
in_flight = 0
while True:
events = dict(await self._poller.poll())
if self.frontend in events:
Expand All @@ -212,24 +214,38 @@ async def run(self) -> None:
[client_id, request_id, _HEALTH]
)
else:
# Pool capacity is measured in rollouts; one group request carries n.
rollout_slots = 1
if method == b"run_group":
with contextlib.suppress(Exception):
request = RunGroupRequest.model_validate(
msgpack.unpackb(payload, raw=False)
)
rollout_slots = max(1, request.n)
worker = min(self.workers, key=lambda w: w["active"])
worker["active"] += 1
pending[request_id] = {"client_id": client_id, "worker": worker}
worker["active"] += rollout_slots
pending[request_id] = {
"client_id": client_id,
"worker": worker,
"rollout_slots": rollout_slots,
}
in_flight += rollout_slots
# forward without client_id — the DEALER identity is the worker's
# `client_id`; we hold the real one in `pending`.
await worker["dealer"].send_multipart(
[request_id, method, payload]
)
if self.elastic:
self._maybe_scale_up(len(pending))
self._maybe_scale_up(in_flight)
for w in self.workers:
if w["dealer"] in events:
request_id, data = await w["dealer"].recv_multipart(copy=False)
# Copy only the routing key; relay the response Frames unchanged.
entry = pending.pop(request_id.bytes, None)
if entry is None:
continue
entry["worker"]["active"] -= 1
entry["worker"]["active"] -= entry["rollout_slots"]
in_flight -= entry["rollout_slots"]
with contextlib.suppress(zmq.ZMQError):
await self.frontend.send_multipart(
[entry["client_id"], request_id, data], copy=False
Expand Down
Loading