Skip to content

Race condition in RealRuntime._current_instance causes validation cross-talk during parallel agent dispatch #40

@medimed66

Description

@medimed66

Summary

RealRuntime stores the currently-executing agent instance on a shared instance attribute (self._current_instance) and reads it back after an await to feed ValidationProcessor. Under parallel_invoke, every fanned-out branch dispatches concurrently on the same RealRuntime, so the attribute races: whichever branch's _translate runs last sees only the last-written instance. The result is that one branch's LLM response is validated against a different branch's agent identity (and therefore its outgoing topology edges). When the edge sets happen to match, the bug is latent. When they differ, the workflow fails with a fabricated "Agent X cannot invoke: [...]" error.

This affects any topology where parallel_invoke fans out to agents with non-identical outgoing edge sets.

Severity

High. Silently corrupts coordination validation. Symptoms are non-deterministic (timing-dependent) and misattributed (the failure surfaces on the wrong branch and names the wrong agent), so the failure mode is hard to diagnose without instrumentation.

Symptoms

  • A workflow with parallel fan-out fails with success=False.
  • The BranchCompletedEvent sequence shows an arbitrary branch with success=False even though the agent itself produced a syntactically valid response.
  • The error message has the form "Agent <X> cannot invoke: [<targets>]" where <X> is not the agent that ran on the failing branch.
  • The resolver branch (typically the orchestrator's) never resumes — its final step count is the step at which it issued parallel_invoke.
  • WorkflowResult.success == False and WorkflowResult.final_response is None.

If the parallel-fan-out workers all have identical outgoing edges (e.g., every worker returns to a single coordinator), the bug stays latent — concurrent runs still happen, but cross-talk between identical edge sets passes validation silently.

Reproducer

Save the following as repro_parallel_race.py and run with any provider key set in your environment (defaults to OpenRouter). Requires only main-branch APIs.

"""
Minimal reproducer for the RealRuntime._current_instance race.

Five agents: an Orchestrator that parallel-invokes A, B, C. A and B return
directly to Orchestrator. C invokes D, which returns to Orchestrator.

The key shape is: ONE of the fanned-out workers (C) has a different
outgoing-edge set than its siblings (its only edge is C -> D, not
C -> Orchestrator). That's what surfaces the race.
"""

from __future__ import annotations
import asyncio, os, sys

from marsys.agents import Agent
from marsys.agents.registry import AgentRegistry
from marsys.coordination import Orchestra
from marsys.coordination.config import ExecutionConfig
from marsys.models.models import ModelConfig


def build_model_config() -> ModelConfig:
    api_key = os.getenv("OPENROUTER_API_KEY") or os.getenv("OPEN_ROUTER_API_KEY")
    if not api_key:
        sys.exit("set OPENROUTER_API_KEY (or your provider's key + adjust ModelConfig)")
    return ModelConfig(
        type="api",
        provider="openrouter",
        name=os.getenv("OPEN_ROUTER_MODEL", "anthropic/claude-haiku-4.5"),
        api_key=api_key,
        temperature=0.2,
        max_tokens=2000,
    )


def build_agents(mc: ModelConfig) -> None:
    Agent(
        model_config=mc, name="Orchestrator", memory_retention="single_run",
        goal="Dispatch A/B/C in parallel; assemble their replies into the word MARS.",
        instruction=(
            "TURN 1: make ONE invoke_agent call with THREE invocations, "
            "agent_name in {AgentA, AgentB, AgentC}, request='Provide your letter.'\n"
            "TURN 2 (after results return): extract every letter from the aggregated "
            "results and call terminate_workflow with response 'The secret word is: MARS'."
        ),
    )
    Agent(
        model_config=mc, name="AgentA", memory_retention="single_run",
        goal="Return letter M to the Orchestrator.",
        instruction=(
            "Call invoke_agent once: agent_name='Orchestrator', "
            "request='Letter from AgentA: M'."
        ),
    )
    Agent(
        model_config=mc, name="AgentB", memory_retention="single_run",
        goal="Return letter A to the Orchestrator.",
        instruction=(
            "Call invoke_agent once: agent_name='Orchestrator', "
            "request='Letter from AgentB: A'."
        ),
    )
    Agent(
        model_config=mc, name="AgentC", memory_retention="single_run",
        goal="Forward letter R to AgentD.",
        instruction=(
            "Call invoke_agent once: agent_name='AgentD', "
            "request='Letter from AgentC: R. Append your letter and forward to the "
            "Orchestrator.' Do NOT invoke Orchestrator yourself."
        ),
    )
    Agent(
        model_config=mc, name="AgentD", memory_retention="single_run",
        goal="Combine AgentC's letter with your own and return to the Orchestrator.",
        instruction=(
            "Your letter is S. Call invoke_agent once: agent_name='Orchestrator', "
            "request='Letters: R (from AgentC), S (from AgentD)'."
        ),
    )


TOPOLOGY = {
    "agents": ["Start", "Orchestrator", "AgentA", "AgentB", "AgentC", "AgentD", "End"],
    "flows":  [
        "Start -> Orchestrator",
        "Orchestrator -> AgentA", "Orchestrator -> AgentB", "Orchestrator -> AgentC",
        "AgentA -> Orchestrator", "AgentB -> Orchestrator",
        "AgentC -> AgentD",           # <-- C's only outgoing edge
        "AgentD -> Orchestrator",
        "Orchestrator -> End",
    ],
    "rules": ["timeout(180)"],
}


async def main() -> int:
    mc = build_model_config()
    build_agents(mc)
    try:
        result = await Orchestra.run(
            task="Collect the letters and assemble the secret word.",
            topology=TOPOLOGY,
            agent_registry=AgentRegistry,
            execution_config=ExecutionConfig(step_timeout=60.0),
            max_steps=30,
        )
        print(f"success={result.success}")
        print(f"final_response={result.final_response!r}")
        print(f"error={getattr(result, 'error', None)!r}")
        # On the buggy code path: success=False, final_response=None,
        # and one BranchCompletedEvent in the trace will carry an error like
        # "Agent <X> cannot invoke: [...]" where <X> is some agent that
        # didn't actually issue that invocation.
        return 0 if result.success else 1
    finally:
        AgentRegistry.clear()


if __name__ == "__main__":
    sys.exit(asyncio.run(main()))

Run:

python repro_parallel_race.py

Expected: success=True, final_response containing "MARS".

Actual: success=False, final_response=None. The trace's branch events include one with an error string of the form "Agent <X> cannot invoke: ['Orchestrator']" where <X> is the agent identity from a different branch than the one carrying the failure.

The bug is timing-dependent. With LLM latency it reproduces ≥9 out of 10 runs in our testing; the rate depends on which agent's response returns first.

Synthetic (no-LLM) reproducer

The race can be exercised without an LLM by mocking step_executor.execute_step to suspend for controllable durations:

# tests/coordination/test_real_runtime_race.py — sketch
import asyncio
import pytest

@pytest.mark.asyncio
async def test_parallel_step_does_not_cross_talk_current_instance(monkeypatch):
    """Two concurrent step() calls must not share self._current_instance."""
    # Build a RealRuntime with two registered agents whose outgoing-edge
    # sets differ. Patch step_executor.execute_step to:
    #   1. Set a sentinel on the runtime (simulating the buggy line 89).
    #   2. Await asyncio.sleep(0) so the other task runs and clobbers it.
    #   3. Return a marsys_result whose coordination_action == "invoke_agent"
    #      with invocations valid for the branch's own agent.
    # Then call asyncio.gather(runtime.step(br_a), runtime.step(br_c)).
    # On the buggy implementation, one of res_a / res_c is FAIL with an
    # error mentioning the *other* branch's agent name.
    res_a, res_c = await asyncio.gather(
        runtime.step(br_a),
        runtime.step(br_c),
    )
    assert res_a.kind != "FAIL", res_a.error
    assert res_c.kind != "FAIL", res_c.error

This unit test pattern follows the existing harness in tests/coordination/orchestrator/.

Expected vs Actual

Expected Actual
All three fan-out branches succeed ✗ — one branch arbitrarily marked FAILED
Validation error mentions the agent that produced the response ✗ — mentions another agent that happened to be _current_instance when _translate ran
Fork barrier sees arrived = {A, B, D}, failed = ∅, fires with success ✗ — arrived = 2, failed = 1, min_ratio=1.0 → fires with failure
Orchestrator's branch resumes once after the fork and produces a final response ✗ — Orchestrator's branch settles after step 1
Workflow result success=True, final_response populated success=False, final_response=None

Root cause

packages/framework/src/marsys/coordination/execution/real_runtime.py:89

async def step(self, branch: OrchestratorBranch) -> OrchestratorStepResult:
    instance = self.registry.get_or_acquire(branch.current_agent, branch_id=branch.id)
    ...
    self._current_instance = instance                                    # ← write
    ...
    marsys_result = await self.step_executor.execute_step(agent=instance, ...)  # ← await yields
    ...
    result = await self._translate(marsys_result, branch)                # ← reads self._current_instance
    return result

packages/framework/src/marsys/coordination/execution/real_runtime.py:209

async def _translate(self, marsys_result, branch):
    ...
    validation = await self.validator.validate_coordination_action(
        action=coord_action,
        data=coord_data,
        agent=getattr(self, "_current_instance", None),                  # ← race-read
        branch=None,
        exec_state=exec_state,
    )

Orchestrator.run dispatches every RUNNING branch as an asyncio.Task calling self.runtime.step(branch) (per ADR-005 — "Concurrency model: every RUNNING branch in the runnable queue is dispatched as an asyncio.Task …"). All those tasks share the same RealRuntime instance. Each sets self._current_instance = instance and then suspends at await self.step_executor.execute_step(...). While suspended, sibling branches stomp the same attribute. By the time one branch's _translate runs, the attribute reflects some other branch's instance.

ValidationProcessor._validate_invoke_agent (validation/response_validator.py:148-156) then computes:

next_agents = self.topology_graph.get_next_agents(agent.name)            # other branch's edges
invalid = [inv.agent_name for inv in invocations if inv.agent_name not in next_agents]
if invalid:
    return ValidationResult(
        is_valid=False,
        error_message=f"Agent {agent.name} cannot invoke: {invalid}",    # other branch's name
        ...
    )

The error string interpolates agent.name from the cross-talked instance, and invocations from the right (current branch's) response. That field mismatch is the smoking-gun signature of the bug.

Why the bug stayed latent

Topologies in which the parallel-fan-out workers all have the same outgoing edges (e.g., a coordinator parallel-invokes N workers that each return only to the coordinator) experience the race on every run, but cross-talk between identical edge sets passes validation silently. Existing parallel-fan-out tests in tests/coordination/orchestrator/ exercise the unified-barrier algorithm with DeterministicRuntime mocks, which never touch _current_instance, so unit-test coverage doesn't reach this path.

The bug surfaces deterministically as soon as any fanned-out worker has a different outgoing-edge set than its siblings.

Evidence

Diagnostic logger.warning instrumentation in Orchestrator (added in a one-off debug session, then reverted) captured the exact sequence on the reproducer above:

parallel_invoke fork=bar_0001 post candidates={br_0001, br_0003, br_0004}
register branch=br_0001 current_agent=AgentA reachable_rendezvous=['Orchestrator']
register branch=br_0003 current_agent=AgentB reachable_rendezvous=['Orchestrator']
register branch=br_0004 current_agent=AgentC reachable_rendezvous=['Orchestrator']
...
interpret branch=br_0001 current_agent=AgentA step.kind=FAIL
fail_to branch=br_0001 current_agent=AgentA target=bar_0001
        reason="Agent AgentC cannot invoke: ['Orchestrator']"
                  ^^^^^^                     ^^^^^^^^^^^^^^^
                  AgentC's identity          AgentA's actual invocation

Cross-checked against the raw LLM responses at DEBUG level: every agent's response was syntactically valid against the topology. AgentA sent invoke_agent(Orchestrator, ...) (valid: AgentA → Orchestrator). AgentC sent invoke_agent(AgentD, ...) (valid: AgentC → AgentD). Yet AgentA's branch received a validation failure that names AgentC and lists Orchestrator as the invalid target.

The diagnostic patch is not in the codebase. Pattern for re-applying it: logger.warning at every state transition in Orchestrator._interpret, _handle_parallel_invoke, _handle_single_invoke, _deliver, _fail_to, _register, _refresh_reachable, _ensure_barrier, _maybe_fire, _fire, _fire_with_failure, _cancel, _abandon. Useful as a debug fixture for any barrier-path bug.

Proposed fix

Two equivalent options. Either deletes self._current_instance entirely.

Option A — pass the instance through the call chain (preferred)

# real_runtime.py
async def step(self, branch):
    instance = self.registry.get_or_acquire(branch.current_agent, branch_id=branch.id)
    if instance is None:
        return OrchestratorStepResult(kind="FAIL", error=...)
    # (delete: self._current_instance = instance)
    ...
    marsys_result = await self.step_executor.execute_step(agent=instance, ...)
    ...
    result = await self._translate(marsys_result, branch, instance)
    return result

async def _translate(self, marsys_result, branch, instance):     # new param
    ...
    validation = await self.validator.validate_coordination_action(
        action=coord_action,
        data=coord_data,
        agent=instance,                                          # local var, no race
        branch=None,
        exec_state=exec_state,
    )

Option B — attach to the StepResult

# real_runtime.py
marsys_result = await self.step_executor.execute_step(agent=instance, ...)
marsys_result._runtime_agent_instance = instance   # local stash on this branch's result
result = await self._translate(marsys_result, branch)

# _translate()
validation = await self.validator.validate_coordination_action(
    ...,
    agent=getattr(marsys_result, "_runtime_agent_instance", None),
)

Option A is preferred because it makes the dependency explicit in the signature and avoids growing a private convention on marsys_result.

Verification plan

  1. Synthetic unit test (sketch above): two concurrent step() calls on the same RealRuntime, fanned-out agents with different outgoing edges, mocked step_executor that releases in the order needed to trigger the race. Assert both step results reflect their own branch's identity. Test must fail on main and pass after the fix.
  2. Live repro (repro_parallel_race.py above): completes with success=True and final_response containing the expected aggregated answer.
  3. Audit for the same anti-pattern in RealRuntime:
    rg -n "self\._\w+ = " packages/framework/src/marsys/coordination/execution/real_runtime.py
    
    Inspect each match for a subsequent await … self._<attr> read.

Notes for the implementer

  • The Orchestrator barrier algorithm behaves correctly under this bug — every fire-gate decision is internally consistent given the (corrupt) inputs it received. The fix is entirely in RealRuntime; no changes to Orchestrator, ValidationProcessor, StepExecutor, or the topology layer are needed.
  • After the fix, the strict-100% convergence policy (min_ratio=1.0, on_insufficient=fail) is the right behavior for the reproducer topology — it just no longer fires with failure because all three branches succeed for real.

Environment

  • Branch reproduced on: main (and feature/tracing-streaming independently).
  • Python: 3.12 and 3.14 (both reproduce).
  • LLM providers tested: OpenRouter (Anthropic Claude Haiku 4.5), Azure OpenAI (gpt-5.4-mini). Bug is provider-independent.
  • OS: Windows 10 / Linux (Ubuntu 24.04). Bug is OS-independent — pure asyncio scheduling order.
  • Reproduces with both a fresh Orchestra.run(...) (single workflow) and across repeated invocations.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions