Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions verifiers/v1/mcp/multiplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from __future__ import annotations

import asyncio
import concurrent.futures
import contextlib
import hashlib
import logging
Expand Down Expand Up @@ -148,8 +149,7 @@ async def ensure(key: str, state_url: str) -> _Child:
child = children.get(key)
if child is not None and child.ready.is_set():
return child
# The lock is held ONLY for the synchronous fork()+register, never the readiness wait — so a
# cold fork serializes other forks (fork-safety) but doesn't stall traffic to other children.
# Readiness waits stay outside the lock, so traffic to ready children remains lock-free.
async with forking:
child = children.get(key)
creating = child is None
Expand Down Expand Up @@ -177,15 +177,29 @@ async def ensure(key: str, state_url: str) -> _Child:
return child

async def reap(key: str) -> None:
child = children.pop(key, None)
if not child:
Comment thread
cursor[bot] marked this conversation as resolved.
return
child.ready.set() # wake any waiter blocked in `ensure` — it re-checks `children` and re-forks
with contextlib.suppress(Exception):
os.kill(child.pid, signal.SIGKILL)
with contextlib.suppress(Exception):
os.waitpid(child.pid, 0)
shutil.rmtree(child.cwd, ignore_errors=True)
# Keep fork() out of the cleanup thread's lifetime; ready children bypass this lock.
async with forking:
child = children.pop(key, None)
if not child:
return
child.ready.set() # wake waiters; they re-check `children` before re-forking
with contextlib.suppress(Exception):
os.kill(child.pid, signal.SIGKILL)

def cleanup() -> None:
with contextlib.suppress(Exception):
os.waitpid(child.pid, 0)
shutil.rmtree(child.cwd, ignore_errors=True)

# One worker keeps cleanup ordered and lets it finish if this task is cancelled.
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = loop.run_in_executor(executor, cleanup)
try:
await asyncio.shield(future)
except asyncio.CancelledError:
await future
raise
logger.info("fork: reaped child pid=%d", child.pid)

async def _respond(send, status: int, body: bytes) -> None:
Expand Down
Loading