diff --git a/verifiers/v1/mcp/multiplex.py b/verifiers/v1/mcp/multiplex.py index 6163fe52e..8679a84aa 100644 --- a/verifiers/v1/mcp/multiplex.py +++ b/verifiers/v1/mcp/multiplex.py @@ -31,6 +31,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import contextlib import hashlib import logging @@ -148,8 +149,7 @@ async def ensure(key: str, state_url: str) -> _Child: child = children.get(key) if child is not None and child.ready.is_set(): return child - # The lock is held ONLY for the synchronous fork()+register, never the readiness wait — so a - # cold fork serializes other forks (fork-safety) but doesn't stall traffic to other children. + # Readiness waits stay outside the lock, so traffic to ready children remains lock-free. async with forking: child = children.get(key) creating = child is None @@ -177,15 +177,29 @@ async def ensure(key: str, state_url: str) -> _Child: return child async def reap(key: str) -> None: - child = children.pop(key, None) - if not child: - return - child.ready.set() # wake any waiter blocked in `ensure` — it re-checks `children` and re-forks - with contextlib.suppress(Exception): - os.kill(child.pid, signal.SIGKILL) - with contextlib.suppress(Exception): - os.waitpid(child.pid, 0) - shutil.rmtree(child.cwd, ignore_errors=True) + # Keep fork() out of the cleanup thread's lifetime; ready children bypass this lock. + async with forking: + child = children.pop(key, None) + if not child: + return + child.ready.set() # wake waiters; they re-check `children` before re-forking + with contextlib.suppress(Exception): + os.kill(child.pid, signal.SIGKILL) + + def cleanup() -> None: + with contextlib.suppress(Exception): + os.waitpid(child.pid, 0) + shutil.rmtree(child.cwd, ignore_errors=True) + + # One worker keeps cleanup ordered and lets it finish if this task is cancelled. + loop = asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: + future = loop.run_in_executor(executor, cleanup) + try: + await asyncio.shield(future) + except asyncio.CancelledError: + await future + raise logger.info("fork: reaped child pid=%d", child.pid) async def _respond(send, status: int, body: bytes) -> None: