diff --git a/recipes/multi_agent_orchestration/README.md b/recipes/multi_agent_orchestration/README.md new file mode 100644 index 000000000..580ff397f --- /dev/null +++ b/recipes/multi_agent_orchestration/README.md @@ -0,0 +1,64 @@ +# Multi-Agent Orchestration with Llama + +A recipe for building multi-agent systems using Llama models, with context layers, agent discovery, and safety wrappers. + +## What You'll Build + +A 3-agent pipeline where: +1. **Researcher** analyzes a topic and produces structured findings +2. **Builder** creates a solution based on research +3. **Reviewer** evaluates the solution against quality criteria + +Each agent has a persistent identity, discovers others via capability cards, and every LLM call is wrapped with safety guards. + +## Concepts Demonstrated + +| Concept | What | Why | +|---|---|---| +| **Context Layers** | Agents have identity (static) + state (dynamic) + knowledge (searchable) | Consistent behavior across sessions | +| **Agent Cards** | JSON capability descriptors per agent | Agents find each other without hardcoding | +| **Safety Guard** | Output validation, cost caps, rollback | Production-grade reliability | +| **Phase Gates** | Research → Build → Review with quality checks | Prevent garbage from propagating | + +## Quick Start + +```bash +pip install -r requirements.txt +# Set your Llama API endpoint (or use ollama) +export LLAMA_BASE_URL=http://localhost:11434/v1 +export LLAMA_MODEL=llama3.2 + +python orchestrator.py "Build a CLI tool that converts CSV to JSON" +``` + +## Files + +| File | What | +|---|---| +| `context_layers.py` | 4-layer context system (identity → state → relevant → archive) | +| `agent_cards.py` | Agent capability discovery and routing | +| `safety_guard.py` | Output validation, cost tracking, rollback | +| `orchestrator.py` | 3-agent pipeline with phase gates | +| `requirements.txt` | Dependencies | + +## How It Works + +``` +User provides task + ↓ +Orchestrator reads agent cards → finds Researcher + ↓ +Researcher (context: identity + task) → structured findings + ↓ [safety guard: validate JSON output] +Orchestrator finds Builder + ↓ +Builder (context: identity + research findings) → solution + ↓ [safety guard: validate + cost check] +Orchestrator finds Reviewer + ↓ +Reviewer (context: identity + solution + criteria) → evaluation + ↓ [phase gate: score > 7/10 to pass] +Output: solution + evaluation report +``` + +Built by [PA·co](https://github.com/PenguinAlleyApps/paco-framework) — A Penguin Alley System. diff --git a/recipes/multi_agent_orchestration/agent_cards.py b/recipes/multi_agent_orchestration/agent_cards.py new file mode 100644 index 000000000..5b5f8c280 --- /dev/null +++ b/recipes/multi_agent_orchestration/agent_cards.py @@ -0,0 +1,78 @@ +""" +Agent Cards — Capability discovery for multi-agent routing. + +Instead of hardcoding "send this to Agent X", agents publish +their capabilities as JSON cards. The orchestrator searches +cards to find the right agent for each task. +""" +from dataclasses import dataclass, field + + +@dataclass +class AgentCard: + """JSON-serializable capability descriptor for an agent.""" + name: str + capabilities: list[str] + inputs: list[str] + outputs: list[str] + + def matches(self, query: str) -> bool: + """Check if this agent matches a capability query.""" + q = query.lower() + return any(q in cap.lower() for cap in self.capabilities) + + def to_dict(self) -> dict: + return { + "name": self.name, + "capabilities": self.capabilities, + "inputs": self.inputs, + "outputs": self.outputs, + } + + +class CardDirectory: + """Registry of all agent cards. Agents find each other here.""" + + def __init__(self): + self._cards: dict[str, AgentCard] = {} + + def register(self, card: AgentCard): + self._cards[card.name] = card + + def find(self, capability: str) -> list[AgentCard]: + """Find agents that match a capability query.""" + return [c for c in self._cards.values() if c.matches(capability)] + + def get(self, name: str) -> AgentCard | None: + return self._cards.get(name) + + def list_all(self) -> list[AgentCard]: + return list(self._cards.values()) + + +# Pre-built directory for the 3-agent pipeline +def build_default_directory() -> CardDirectory: + directory = CardDirectory() + + directory.register(AgentCard( + name="Researcher", + capabilities=["research", "analysis", "market_study", "competitive_intel"], + inputs=["topic", "question", "domain"], + outputs=["structured_findings_json"], + )) + + directory.register(AgentCard( + name="Builder", + capabilities=["build", "implement", "code", "create", "develop"], + inputs=["research_findings", "spec", "requirements"], + outputs=["code", "implementation_plan"], + )) + + directory.register(AgentCard( + name="Reviewer", + capabilities=["review", "evaluate", "qa", "quality_check"], + inputs=["solution", "criteria", "research_context"], + outputs=["evaluation_json", "score", "verdict"], + )) + + return directory diff --git a/recipes/multi_agent_orchestration/context_layers.py b/recipes/multi_agent_orchestration/context_layers.py new file mode 100644 index 000000000..11c0e068d --- /dev/null +++ b/recipes/multi_agent_orchestration/context_layers.py @@ -0,0 +1,76 @@ +""" +Context Layers — Give agents persistent identity + dynamic state. + +4 layers, from most stable to most dynamic: + Layer 1 (Identity): Who the agent IS. Never changes mid-session. + Layer 2 (State): What's happening NOW. Changes every call. + Layer 3 (Relevant): Knowledge retrieved for this specific task. + Layer 4 (Archive): Everything stored, not loaded unless searched. +""" +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class AgentContext: + """Assembled context for an agent, built from 4 layers.""" + + # Layer 1: Identity (static per session) + name: str + role: str + instructions: str + + # Layer 2: State (dynamic per call) + current_task: str = "" + previous_output: str = "" + + # Layer 3: Relevant (retrieved per task) + knowledge: list[str] = field(default_factory=list) + + def to_system_prompt(self) -> str: + """Assemble all layers into a system prompt.""" + parts = [ + f"# Identity\nYou are {self.name}, a {self.role}.\n{self.instructions}", + ] + if self.current_task: + parts.append(f"\n# Current State\nTask: {self.current_task}") + if self.previous_output: + parts.append(f"Previous step output:\n{self.previous_output}") + if self.knowledge: + parts.append( + "\n# Relevant Knowledge\n" + "\n".join(f"- {k}" for k in self.knowledge) + ) + return "\n".join(parts) + + +# Pre-built agent identities +RESEARCHER = AgentContext( + name="Researcher", + role="research analyst", + instructions=( + "You analyze topics thoroughly and produce structured findings.\n" + "Output JSON with: {summary, key_points[], risks[], opportunities[]}\n" + "Be specific. Cite reasoning. Flag uncertainties." + ), +) + +BUILDER = AgentContext( + name="Builder", + role="implementation engineer", + instructions=( + "You build solutions based on research findings.\n" + "Output working code or detailed implementation plan.\n" + "Prioritize: correctness > speed > elegance." + ), +) + +REVIEWER = AgentContext( + name="Reviewer", + role="quality reviewer", + instructions=( + "You evaluate solutions against quality criteria.\n" + "Output JSON with: {score (1-10), strengths[], issues[], verdict}\n" + "Score >= 7 = PASS. Score < 7 = needs revision.\n" + "Be constructive but honest. No rubber stamping." + ), +) diff --git a/recipes/multi_agent_orchestration/orchestrator.py b/recipes/multi_agent_orchestration/orchestrator.py new file mode 100644 index 000000000..599bbd764 --- /dev/null +++ b/recipes/multi_agent_orchestration/orchestrator.py @@ -0,0 +1,180 @@ +""" +Multi-Agent Orchestrator — 3-agent pipeline with phase gates. + +Usage: + python orchestrator.py "Build a CLI tool that converts CSV to JSON" + +Requires: + LLAMA_BASE_URL (default: http://localhost:11434/v1 for Ollama) + LLAMA_MODEL (default: llama3.2) +""" +import json +import os +import sys +from typing import Optional + +from context_layers import AgentContext, RESEARCHER, BUILDER, REVIEWER +from agent_cards import build_default_directory +from safety_guard import SafetyGuard + +# --- LLM Client (OpenAI-compatible, works with Ollama / vLLM / llama.cpp) --- + +def call_llm(system: str, user: str, model: Optional[str] = None) -> str: + """Call an OpenAI-compatible LLM endpoint.""" + import requests + + base_url = os.getenv("LLAMA_BASE_URL", "http://localhost:11434/v1") + model_name = model or os.getenv("LLAMA_MODEL", "llama3.2") + + resp = requests.post( + f"{base_url}/chat/completions", + json={ + "model": model_name, + "messages": [ + {"role": "system", "content": system}, + {"role": "user", "content": user}, + ], + "temperature": 0.7, + "max_tokens": 2000, + }, + timeout=120, + ) + resp.raise_for_status() + return resp.json()["choices"][0]["message"]["content"] + + +# --- Pipeline --- + +def run_pipeline(task: str, verbose: bool = True): + """Run the 3-agent pipeline: Research → Build → Review.""" + + directory = build_default_directory() + guard = SafetyGuard(max_cost_usd=5.0) + + # === PHASE 1: RESEARCH === + if verbose: + print("\n=== Phase 1: Research ===") + + researcher_card = directory.find("research")[0] + researcher = RESEARCHER + researcher.current_task = task + + research_output = call_llm( + system=researcher.to_system_prompt(), + user=f"Research this task and produce structured findings as JSON:\n\n{task}", + ) + + # Safety check: validate JSON output + valid, research_data = guard.validate_json( + research_output, required_fields=["summary", "key_points"] + ) + if not valid: + print(f" WARNING: Research output not valid JSON ({research_data})") + print(f" Using raw text as findings") + research_data = {"summary": research_output, "key_points": []} + else: + guard.save_good("Researcher", research_data) + + guard.track_cost(research_output) + + if verbose: + summary = research_data.get("summary", research_output[:200]) + print(f" Findings: {summary[:150]}...") + + # === PHASE 2: BUILD === + if verbose: + print("\n=== Phase 2: Build ===") + + builder_card = directory.find("build")[0] + builder = BUILDER + builder.current_task = task + builder.previous_output = json.dumps(research_data) if isinstance(research_data, dict) else str(research_data) + + build_output = call_llm( + system=builder.to_system_prompt(), + user=( + f"Based on these research findings, build a solution:\n\n" + f"Research: {builder.previous_output[:1500]}\n\n" + f"Original task: {task}" + ), + ) + + guard.track_cost(build_output) + guard.save_good("Builder", build_output) + + if verbose: + print(f" Solution: {build_output[:150]}...") + + # === PHASE 3: REVIEW (phase gate) === + if verbose: + print("\n=== Phase 3: Review ===") + + reviewer_card = directory.find("review")[0] + reviewer = REVIEWER + reviewer.current_task = f"Evaluate solution for: {task}" + reviewer.previous_output = build_output + reviewer.knowledge = [ + "Score >= 7 means PASS. Score < 7 means needs revision.", + f"Research context: {json.dumps(research_data)[:500] if isinstance(research_data, dict) else str(research_data)[:500]}", + ] + + review_output = call_llm( + system=reviewer.to_system_prompt(), + user=( + f"Evaluate this solution:\n\n{build_output[:2000]}\n\n" + f"Criteria: correctness, completeness, code quality, production-readiness.\n" + f"Output JSON: {{score, strengths[], issues[], verdict}}" + ), + ) + + guard.track_cost(review_output) + + valid, review_data = guard.validate_json( + review_output, required_fields=["score", "verdict"] + ) + + if valid: + score = review_data.get("score", 0) + verdict = review_data.get("verdict", "unknown") + guard.save_good("Reviewer", review_data) + else: + score = 0 + verdict = "parse_failed" + print(f" WARNING: Review not valid JSON ({review_data})") + + # === PHASE GATE === + passed = score >= 7 + + if verbose: + print(f"\n{'='*50}") + print(f" Score: {score}/10 — {'PASS' if passed else 'NEEDS REVISION'}") + print(f" Verdict: {verdict}") + if valid and "issues" in review_data: + for issue in review_data["issues"]: + print(f" Issue: {issue}") + print(f"\n Cost: {json.dumps(guard.summary())}") + + return { + "task": task, + "research": research_data, + "solution": build_output, + "review": review_data if valid else review_output, + "score": score, + "passed": passed, + "cost": guard.summary(), + } + + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python orchestrator.py \"Your task description\"") + sys.exit(1) + + task = " ".join(sys.argv[1:]) + result = run_pipeline(task) + + if not result["passed"]: + print("\nPipeline result: NEEDS REVISION") + print("In production, this would loop back to Builder with review feedback.") + else: + print("\nPipeline result: PASSED") diff --git a/recipes/multi_agent_orchestration/requirements.txt b/recipes/multi_agent_orchestration/requirements.txt new file mode 100644 index 000000000..a8608b2c6 --- /dev/null +++ b/recipes/multi_agent_orchestration/requirements.txt @@ -0,0 +1 @@ +requests>=2.28.0 diff --git a/recipes/multi_agent_orchestration/safety_guard.py b/recipes/multi_agent_orchestration/safety_guard.py new file mode 100644 index 000000000..c28dbd6ac --- /dev/null +++ b/recipes/multi_agent_orchestration/safety_guard.py @@ -0,0 +1,75 @@ +""" +Safety Guard — Validate LLM output, track costs, enable rollback. + +Simplified version of Penguin Guard (github.com/PenguinAlleyApps/penguin-guard). +""" +import json +import logging +from typing import Any, Optional + +logger = logging.getLogger("safety_guard") + + +class SafetyGuard: + """Wraps LLM calls with validation, cost tracking, and rollback.""" + + def __init__(self, max_cost_usd: float = 10.0, cost_per_1m_tokens: float = 0.20): + self.max_cost_usd = max_cost_usd + self.cost_per_1m_tokens = cost_per_1m_tokens + self.total_cost = 0.0 + self.total_tokens = 0 + self.call_count = 0 + self._last_good: dict[str, Any] = {} + + def validate_json(self, text: str, required_fields: list[str] | None = None) -> tuple[bool, Any]: + """Validate that output is parseable JSON with required fields.""" + # Try direct parse + try: + data = json.loads(text) + except json.JSONDecodeError: + # Try extracting from markdown code blocks + import re + match = re.search(r"```(?:json)?\s*\n(.*?)\n```", text, re.DOTALL) + if match: + try: + data = json.loads(match.group(1)) + except json.JSONDecodeError: + return False, "JSON parse failed (even from code block)" + else: + return False, "Not valid JSON" + + if required_fields: + missing = [f for f in required_fields if f not in data] + if missing: + return False, f"Missing fields: {missing}" + + return True, data + + def track_cost(self, text: str) -> bool: + """Track token cost. Returns False if budget exceeded.""" + tokens = len(text) // 4 # rough estimate + cost = (tokens / 1_000_000) * self.cost_per_1m_tokens + self.total_cost += cost + self.total_tokens += tokens + self.call_count += 1 + + if self.total_cost >= self.max_cost_usd: + logger.warning(f"Budget exceeded: ${self.total_cost:.4f} >= ${self.max_cost_usd}") + return False + return True + + def save_good(self, agent_name: str, output: Any): + """Save a known-good output for rollback.""" + self._last_good[agent_name] = output + + def rollback(self, agent_name: str) -> Any | None: + """Get last known-good output for an agent.""" + return self._last_good.get(agent_name) + + def summary(self) -> dict: + return { + "total_cost_usd": round(self.total_cost, 6), + "total_tokens": self.total_tokens, + "call_count": self.call_count, + "budget_remaining": round(self.max_cost_usd - self.total_cost, 4), + }