This report is being generated. The page will refresh automatically.
+No content available for this report.
diff --git a/.github/workflows/linter.yaml b/.github/workflows/linter.yaml
index f2e8adc2a..938a013f0 100644
--- a/.github/workflows/linter.yaml
+++ b/.github/workflows/linter.yaml
@@ -15,12 +15,12 @@ jobs:
steps:
- name: Checkout Code
- uses: actions/checkout@v5
+ uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5
with:
fetch-depth: 0
- name: GitHub Super Linter
- uses: super-linter/super-linter/slim@v8
+ uses: super-linter/super-linter/slim@9e863354e3ff62e0727d37183162c4a88873df41 # v8
env:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -29,6 +29,8 @@ jobs:
SHELLCHECK_OPTS: -e SC1091 -e 2086
VALIDATE_ALL_CODEBASE: false
FILTER_REGEX_EXCLUDE: "^(\\.github/|\\.vscode/).*|CODE_OF_CONDUCT.md|(extensions/agp/).*|.*pyproto/.*|.*pb/.*|itk/agents/go/.*"
+ VALIDATE_GO: false
+ VALIDATE_GO_MODULES: false
VALIDATE_PYTHON_BLACK: false
VALIDATE_PYTHON_FLAKE8: false
VALIDATE_PYTHON_ISORT: false
@@ -52,3 +54,28 @@ jobs:
VALIDATE_BIOME_FORMAT: false
VALIDATE_BIOME_LINT: false
VALIDATE_GITHUB_ACTIONS_ZIZMOR: false
+
+ go-lint:
+ name: Lint Go
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+ steps:
+ - name: Checkout Code
+ uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5
+
+ - name: Set up Go
+ uses: actions/setup-go@40f1582b2485089dde7abd97c1529aa768e1baff # v5
+ with:
+ go-version: stable
+
+ - name: Lint all Go modules
+ run: |
+ go install github.com/golangci/golangci-lint/v2/cmd/golangci-lint@latest
+ status=0
+ while IFS= read -r dir; do
+ echo "::group::Linting $dir"
+ (cd "$dir" && golangci-lint run ./...) || status=$?
+ echo "::endgroup::"
+ done < <(find . -name go.mod -exec dirname {} \;)
+ exit $status
diff --git a/.golangci.yml b/.golangci.yml
new file mode 100644
index 000000000..f2621c620
--- /dev/null
+++ b/.golangci.yml
@@ -0,0 +1,6 @@
+version: "2"
+linters:
+ settings:
+ errcheck:
+ exclude-functions:
+ - "(*database/sql.Tx).Rollback"
diff --git a/samples/go/agents/deepresearch/.example.env b/samples/go/agents/deepresearch/.example.env
new file mode 100644
index 000000000..397ea6c15
--- /dev/null
+++ b/samples/go/agents/deepresearch/.example.env
@@ -0,0 +1,2 @@
+GOOGLE_API_KEY=
+REPORT_URL=http://127.0.0.1:8080
diff --git a/samples/go/agents/deepresearch/AGENTS.md b/samples/go/agents/deepresearch/AGENTS.md
new file mode 100644
index 000000000..9448ce3d3
--- /dev/null
+++ b/samples/go/agents/deepresearch/AGENTS.md
@@ -0,0 +1,118 @@
+# AGENTS.md — Deep Research Agent Style Guide
+
+## Project Overview
+
+This is a **multi-agent deep research system** built on the A2A (Agent-to-Agent) protocol. A single Go binary serves four agent roles — orchestrator, researcher, analyzer, synthesizer — selected at runtime via the `NODE_TYPE` environment variable. The system uses NATS JetStream for event sourcing and work distribution, MySQL for task persistence, and Gemini (via Google ADK) for LLM reasoning.
+
+### Key dependencies
+
+| Dependency | Purpose |
+| --- | --- |
+| `github.com/a2aproject/a2a-go/v2` | A2A protocol SDK (server, client, types, push, queues, stores) |
+| `google.golang.org/adk` | Google Agent Development Kit (LLM agents, runners, sessions, tools) |
+| `google.golang.org/genai` | Google GenAI SDK (Gemini model, content types) |
+| `github.com/nats-io/nats.go` | NATS client / JetStream |
+| `github.com/go-sql-driver/mysql` | MySQL driver (blank-imported for side effects) |
+
+---
+
+## Architecture
+
+```text
+Client → Orchestrator (state machine)
+ ├── Researcher (Google Search grounding)
+ ├── Analyzer (referenced-task injection)
+ └── Synthesizer (referenced-task injection)
+
+Infrastructure: MySQL (task index + outbox) · NATS JetStream (events, work, state) · nginx (host-based LB)
+```
+
+- **Single binary, multi-role**: `main.go` reads `NODE_TYPE` and wires the corresponding `a2asrv.AgentExecutor`.
+- **Event sourcing**: Tasks are materialized by replaying events from NATS streams.
+- **Transactional outbox**: MySQL insert + NATS publish are guaranteed atomic via an outbox table relayed by a leader-elected poller.
+- **Scatter/gather**: The orchestrator fans out subtasks via async A2A sends and gathers results through NATS push notifications.
+
+---
+
+## Project Layout
+
+```text
+deepresearch/
+├── main.go # Entry point, config, server wiring
+├── internal/
+│ ├── agents/ # Agent executors (orchestrator, researcher, analyzer, synthesizer)
+│ ├── clusterclient/ # Async A2A client wrapper for inter-agent communication
+│ ├── domain/ # Shared domain types (AgentType enum, Info)
+│ ├── lease/ # NATS KV-based leader election
+│ ├── msgstream/ # NATS-backed event queues, work queues, push sender
+│ ├── report/ # HTTP handler for serving synthesized reports
+│ ├── server/ # Server wiring (infra setup, handler creation)
+│ ├── statemachine/ # Generic event-sourced state machine
+│ ├── store/ # MySQL-backed task store, indexing, transactional outbox
+│ ├── testutil/ # Shared test helpers
+│ └── utils/ # Small generic helpers (Must, SchemaFor)
+├── infra/ # Docker Compose, nginx, MySQL schema, NATS bootstrap
+├── Dockerfile
+└── go.mod
+```
+
+**Rules**:
+- All domain logic lives under `internal/` — one concern per package.
+- Each package should have a single clear responsibility (e.g., `lease` only does leader election).
+- `main.go` is the only file in package `main`; it handles configuration, dependency wiring, and graceful shutdown.
+
+---
+
+## Coding Rules
+
+### Testing
+
+- Test observable behavior, not the internal state.
+- Use table-driven tests where applicable.
+- Name test functions `TestFunctionName_scenario`.
+
+### Comments
+
+- **Prefer self-explanatory code**.
+- **Doc comments**: `// SymbolName does X.` directly above the symbol. Start with the symbol name per Go convention. Add for all exported symbols, but be brief.
+- **Inline comments**: Use sparingly, be brief, explain *why* not *what*.
+- **References**: Use Go doc-link syntax `[a2a.Client]` when referencing other symbols.
+
+### Logging
+
+Use `github.com/a2aproject/a2a-go/v2/log` exclusively. Do not use `log/slog` or `fmt.Println` for application logging.
+
+---
+
+## Things to Know
+
+### Event-sourced state machine (`internal/statemachine/`)
+
+The generic `statemachine.Spec[E, S]` (driven by `statemachine.Run`) drives the orchestrator:
+- **Decode**: Parse raw NATS messages into typed events.
+- **Evolve**: Apply events to state (pure state transitions, no side effects).
+- **Act**: Inspect state and decide on side effects (dispatch subtasks, call LLM, complete).
+
+### Transactional outbox (`internal/store/outbox.go`)
+
+Guarantees atomicity between MySQL writes and NATS publishes:
+1. Insert task + outbox row (tagged with the agent type) in the same SQL transaction.
+2. A leader-elected poller reads outbox rows for its own agent type, publishes to NATS, then deletes.
+
+### Decorator pattern (`internal/agents/common.go`)
+
+`referencedTaskLoader` wraps an `AgentExecutor` via embedding and intercepts `Execute` to inject referenced task content before delegating to the inner executor.
+
+### Leader election (`internal/lease/`)
+
+Uses NATS KV `Create` (atomic put-if-absent) for distributed locking. Watches for key deletion to retry acquisition.
+
+### Infrastructure
+
+All services are defined in `infra/docker-compose.yaml`.
+
+### Misc
+
+- This agent is a **self-contained Go module** (`go.mod` at the deepresearch root). It does not share code with other Go samples in the repository.
+- The A2A SDK (`a2a-go/v2`) provides the server framework, client, types, and infrastructure interfaces (queues, stores, push). Domain logic implements these interfaces.
+- The orchestrator's workflow proceeds through stages: **research -> analyze -> follow-up research -> synthesize -> complete**.
diff --git a/samples/go/agents/deepresearch/Dockerfile b/samples/go/agents/deepresearch/Dockerfile
new file mode 100644
index 000000000..927408916
--- /dev/null
+++ b/samples/go/agents/deepresearch/Dockerfile
@@ -0,0 +1,13 @@
+FROM golang:1.25-alpine AS builder
+
+WORKDIR /app
+
+COPY go.mod go.sum ./
+RUN go mod download
+
+COPY . .
+RUN CGO_ENABLED=0 GOOS=linux go build -o /deepresearch .
+
+FROM gcr.io/distroless/static:nonroot
+COPY --from=builder /deepresearch /deepresearch
+ENTRYPOINT ["/deepresearch"]
diff --git a/samples/go/agents/deepresearch/Makefile b/samples/go/agents/deepresearch/Makefile
new file mode 100644
index 000000000..31454bb61
--- /dev/null
+++ b/samples/go/agents/deepresearch/Makefile
@@ -0,0 +1,76 @@
+COMPOSE := docker compose --env-file .env -f infra/docker-compose.yaml
+NATS_URL := nats://localhost:4222
+MYSQL_DSN := root:root@tcp(localhost:3306)/planner?parseTime=true
+
+# Local ports (go run mode)
+ORCH_PORT := 8080
+RESEARCH_PORT := 8081
+ANALYZE_PORT := 8082
+SYNTH_PORT := 8083
+
+# ── Docker Compose ──────────────────────────────────────────────
+
+.PHONY: up force-up infra-up down clean
+
+## up: start all services (cached images)
+up:
+ $(COMPOSE) up -d
+
+## force-up: rebuild images and start all services
+force-up:
+ $(COMPOSE) up --build -d
+
+## infra-up: start only NATS, MySQL, and run the NATS init script
+infra-up:
+ $(COMPOSE) up nats mysql -d
+ $(COMPOSE) run --rm nats-init
+
+## down: stop all services (preserves volumes)
+down:
+ $(COMPOSE) down
+
+## clean: stop all services and remove volumes
+clean:
+ $(COMPOSE) down -v
+
+# ── Local Mode (go run) ────────────────────────────────────────
+# Runs all four agents as local processes against containerised
+# NATS + MySQL. No nginx needed — the orchestrator connects to
+# researcher/analyzer/synthesizer directly on localhost ports.
+#
+# Prerequisites: make infra-up
+# Usage: make local
+# Stop: Ctrl-C (kills all four processes)
+
+.PHONY: local
+
+local:
+ @if [ -z "$$GOOGLE_API_KEY" ] && [ -f infra/.env ]; then \
+ export $$(grep -v '^#' infra/.env | xargs); \
+ fi; \
+ trap 'kill 0' EXIT; \
+ GOOGLE_API_KEY=$${GOOGLE_API_KEY} NODE_TYPE=researcher LISTEN_ADDR=:$(RESEARCH_PORT) go run . & \
+ GOOGLE_API_KEY=$${GOOGLE_API_KEY} NODE_TYPE=analyzer LISTEN_ADDR=:$(ANALYZE_PORT) go run . & \
+ GOOGLE_API_KEY=$${GOOGLE_API_KEY} NODE_TYPE=synthesizer LISTEN_ADDR=:$(SYNTH_PORT) go run . & \
+ sleep 1; \
+ GOOGLE_API_KEY=$${GOOGLE_API_KEY} \
+ REPORT_URL=http://127.0.0.1:8080 \
+ NODE_TYPE=orchestrator \
+ LISTEN_ADDR=:$(ORCH_PORT) \
+ RESEARCHER_URL=http://localhost:$(RESEARCH_PORT) \
+ ANALYZER_URL=http://localhost:$(ANALYZE_PORT) \
+ SYNTHESIZER_URL=http://localhost:$(SYNTH_PORT) \
+ go run . ; \
+ wait
+
+# ── Testing ─────────────────────────────────────────────────────
+
+.PHONY: test send
+
+## test: run integration tests (starts infra containers automatically)
+test:
+ go test -v -timeout 60s ./itest/
+
+## send: send a test message to the orchestrator via the a2a CLI
+send:
+ a2a send http://localhost:$(ORCH_PORT) "Research the impact of AI on healthcare" --transport rest --stream --timeout 5m
diff --git a/samples/go/agents/deepresearch/README.md b/samples/go/agents/deepresearch/README.md
new file mode 100644
index 000000000..22151b983
--- /dev/null
+++ b/samples/go/agents/deepresearch/README.md
@@ -0,0 +1,42 @@
+# Deep Research
+
+A multi-agent system that performs deep research on a given topic. The project showcases a way of implementing the standard SDK interfaces for integrating with various popular infra components like MySQL and NATS.
+
+Built using [a2a-go](https://github.com/a2aproject/a2a-go) and [adk](https://github.com/google/adk-go).
+
+## Overview
+
+* Horizontally scalable cluster of different agent types: orchestrator, researcher, analyzer, and synthesizer.
+* MySQL for task indexing and Jetstream for event persistence.
+* Push notification sender for signaling subtask completion to the orchestrator.
+* NATS for work distribution, event and push notification delivery.
+* Retryable execution with state checkpointing.
+
+
+
+## Running
+
+1. Rename `.example.env` to `.env` and update your `GOOGLE_API_KEY` ([learn more](https://docs.cloud.google.com/docs/authentication/api-keys)).
+
+2. Start the full stack using docker-compose by running `make up`.
+
+3. Call orchestrator using [a2a-cli](https://github.com/a2aproject/a2a-go#-cli) (`make send`), [a2a-inspector](https://github.com/a2aproject/a2a-inspector) or another client.
+
+
+## Details
+
+Orchestrator agents handle client requests:
+1. Uses LLM planner to decompose a question into subtasks.
+2. Dispatches them to a cluster of researcher agents with `returnImmediately: true`.
+3. Waits for results using NATS-based push notifications.
+4. Invokes an analyzer to find contradictory topics for a follow-up research.
+5. Initiates the follow-up research.
+6. Invokes a synthesizer to generate a final report.
+
+If an orchestrator crashes, the state machine replays its event stream from the NATS STATES stream to recover which stages were dispatched and which completed, then resumes from where it left off.
+
+Orchestrator never loads large task contents into memory and instead uses task references when communicating with synthesizer and analyzer. The final report is returned to a user as a reference.
+
+Push notifications allow orchestrator to limit the number of open long-lived connections and avoid subtask status polling.
+
+
\ No newline at end of file
diff --git a/samples/go/agents/deepresearch/assets/deepresearch.png b/samples/go/agents/deepresearch/assets/deepresearch.png
new file mode 100644
index 000000000..5b5439467
Binary files /dev/null and b/samples/go/agents/deepresearch/assets/deepresearch.png differ
diff --git a/samples/go/agents/deepresearch/assets/sample_output.png b/samples/go/agents/deepresearch/assets/sample_output.png
new file mode 100644
index 000000000..3e49df635
Binary files /dev/null and b/samples/go/agents/deepresearch/assets/sample_output.png differ
diff --git a/samples/go/agents/deepresearch/go.mod b/samples/go/agents/deepresearch/go.mod
new file mode 100644
index 000000000..cca51b9c6
--- /dev/null
+++ b/samples/go/agents/deepresearch/go.mod
@@ -0,0 +1,51 @@
+module github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch
+
+go 1.25.0
+
+require (
+ github.com/a2aproject/a2a-go/v2 v2.3.2-0.20260606182037-3134e71be608
+ github.com/go-sql-driver/mysql v1.10.0
+ github.com/google/uuid v1.6.0
+ github.com/nats-io/nats.go v1.52.0
+ golang.org/x/sync v0.20.0
+ google.golang.org/adk v1.3.0
+ google.golang.org/genai v1.58.0
+)
+
+require (
+ cloud.google.com/go v0.123.0 // indirect
+ cloud.google.com/go/auth v0.20.0 // indirect
+ cloud.google.com/go/compute/metadata v0.9.0 // indirect
+ filippo.io/edwards25519 v1.2.0 // indirect
+ github.com/cespare/xxhash/v2 v2.3.0 // indirect
+ github.com/felixge/httpsnoop v1.0.4 // indirect
+ github.com/go-logr/logr v1.4.3 // indirect
+ github.com/go-logr/stdr v1.2.2 // indirect
+ github.com/google/go-cmp v0.7.0 // indirect
+ github.com/google/jsonschema-go v0.4.3 // indirect
+ github.com/google/s2a-go v0.1.9 // indirect
+ github.com/google/safehtml v0.1.0 // indirect
+ github.com/googleapis/enterprise-certificate-proxy v0.3.15 // indirect
+ github.com/googleapis/gax-go/v2 v2.22.0 // indirect
+ github.com/gorilla/websocket v1.5.3 // indirect
+ github.com/klauspost/compress v1.18.6 // indirect
+ github.com/nats-io/nkeys v0.4.15 // indirect
+ github.com/nats-io/nuid v1.0.1 // indirect
+ go.opentelemetry.io/auto/sdk v1.2.1 // indirect
+ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 // indirect
+ go.opentelemetry.io/otel v1.43.0 // indirect
+ go.opentelemetry.io/otel/log v0.16.0 // indirect
+ go.opentelemetry.io/otel/metric v1.43.0 // indirect
+ go.opentelemetry.io/otel/trace v1.43.0 // indirect
+ golang.org/x/crypto v0.51.0 // indirect
+ golang.org/x/mod v0.35.0 // indirect
+ golang.org/x/net v0.54.0 // indirect
+ golang.org/x/sys v0.44.0 // indirect
+ golang.org/x/text v0.37.0 // indirect
+ google.golang.org/api v0.279.0 // indirect
+ google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 // indirect
+ google.golang.org/grpc v1.81.0 // indirect
+ google.golang.org/protobuf v1.36.11 // indirect
+ rsc.io/omap v1.2.0 // indirect
+ rsc.io/ordered v1.1.1 // indirect
+)
diff --git a/samples/go/agents/deepresearch/go.sum b/samples/go/agents/deepresearch/go.sum
new file mode 100644
index 000000000..93a689918
--- /dev/null
+++ b/samples/go/agents/deepresearch/go.sum
@@ -0,0 +1,109 @@
+cloud.google.com/go v0.123.0 h1:2NAUJwPR47q+E35uaJeYoNhuNEM9kM8SjgRgdeOJUSE=
+cloud.google.com/go v0.123.0/go.mod h1:xBoMV08QcqUGuPW65Qfm1o9Y4zKZBpGS+7bImXLTAZU=
+cloud.google.com/go/auth v0.20.0 h1:kXTssoVb4azsVDoUiF8KvxAqrsQcQtB53DcSgta74CA=
+cloud.google.com/go/auth v0.20.0/go.mod h1:942/yi/itH1SsmpyrbnTMDgGfdy2BUqIKyd0cyYLc5Q=
+cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs=
+cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10=
+filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo=
+filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc=
+github.com/a2aproject/a2a-go/v2 v2.3.2-0.20260606182037-3134e71be608 h1:ujKERViS9DvYzkDJr6SJn7R0AgPPVi4PAUtNJzajha0=
+github.com/a2aproject/a2a-go/v2 v2.3.2-0.20260606182037-3134e71be608/go.mod h1:mkZr8y2bUgAVQsjs/5fHK7xrRlAHDybMEyxWh2tKRC8=
+github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
+github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
+github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/go-sql-driver/mysql v1.10.0 h1:Q+1LV8DkHJvSYAdR83XzuhDaTykuDx0l6fkXxoWCWfw=
+github.com/go-sql-driver/mysql v1.10.0/go.mod h1:M+cqaI7+xxXGG9swrdeUIoPG3Y3KCkF0pZej+SK+nWk=
+github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
+github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/jsonschema-go v0.4.3 h1:/DBOLZTfDow7pe2GmaJNhltueGTtDKICi8V8p+DQPd0=
+github.com/google/jsonschema-go v0.4.3/go.mod h1:r5quNTdLOYEz95Ru18zA0ydNbBuYoo9tgaYcxEYhJVE=
+github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
+github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
+github.com/google/safehtml v0.1.0 h1:EwLKo8qawTKfsi0orxcQAZzu07cICaBeFMegAU9eaT8=
+github.com/google/safehtml v0.1.0/go.mod h1:L4KWwDsUJdECRAEpZoBn3O64bQaywRscowZjJAzjHnU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/googleapis/enterprise-certificate-proxy v0.3.15 h1:xolVQTEXusUcAA5UgtyRLjelpFFHWlPQ4XfWGc7MBas=
+github.com/googleapis/enterprise-certificate-proxy v0.3.15/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg=
+github.com/googleapis/gax-go/v2 v2.22.0 h1:PjIWBpgGIVKGoCXuiCoP64altEJCj3/Ei+kSU5vlZD4=
+github.com/googleapis/gax-go/v2 v2.22.0/go.mod h1:irWBbALSr0Sk3qlqb9SyJ1h68WjgeFuiOzI4Rqw5+aY=
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/klauspost/compress v1.18.6 h1:2jupLlAwFm95+YDR+NwD2MEfFO9d4z4Prjl1XXDjuao=
+github.com/klauspost/compress v1.18.6/go.mod h1:cwPg85FWrGar70rWktvGQj8/hthj3wpl0PGDogxkrSQ=
+github.com/nats-io/nats.go v1.52.0 h1:n3avV4VBsCgsdwh71TppsTwtv+QdPs7ntSKM8qJLGsc=
+github.com/nats-io/nats.go v1.52.0/go.mod h1:26HypzazeOkyO3/mqd1zZd53STJN0EjCYF9Uy2ZOBno=
+github.com/nats-io/nkeys v0.4.15 h1:JACV5jRVO9V856KOapQ7x+EY8Jo3qw1vJt/9Jpwzkk4=
+github.com/nats-io/nkeys v0.4.15/go.mod h1:CpMchTXC9fxA5zrMo4KpySxNjiDVvr8ANOSZdiNfUrs=
+github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
+github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
+go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
+go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0/go.mod h1:BuhAPThV8PBHBvg8ZzZ/Ok3idOdhWIodywz2xEcRbJo=
+go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
+go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
+go.opentelemetry.io/otel/log v0.16.0 h1:DeuBPqCi6pQwtCK0pO4fvMB5eBq6sNxEnuTs88pjsN4=
+go.opentelemetry.io/otel/log v0.16.0/go.mod h1:rWsmqNVTLIA8UnwYVOItjyEZDbKIkMxdQunsIhpUMes=
+go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
+go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
+go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
+go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
+go.opentelemetry.io/otel/sdk/log v0.16.0 h1:e/b4bdlQwC5fnGtG3dlXUrNOnP7c8YLVSpSfEBIkTnI=
+go.opentelemetry.io/otel/sdk/log v0.16.0/go.mod h1:JKfP3T6ycy7QEuv3Hj8oKDy7KItrEkus8XJE6EoSzw4=
+go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
+go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
+go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
+go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
+golang.org/x/crypto v0.51.0 h1:IBPXwPfKxY7cWQZ38ZCIRPI50YLeevDLlLnyC5wRGTI=
+golang.org/x/crypto v0.51.0/go.mod h1:8AdwkbraGNABw2kOX6YFPs3WM22XqI4EXEd8g+x7Oc8=
+golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM=
+golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU=
+golang.org/x/net v0.54.0 h1:2zJIZAxAHV/OHCDTCOHAYehQzLfSXuf/5SoL/Dv6w/w=
+golang.org/x/net v0.54.0/go.mod h1:Sj4oj8jK6XmHpBZU/zWHw3BV3abl4Kvi+Ut7cQcY+cQ=
+golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
+golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
+golang.org/x/sys v0.44.0 h1:ildZl3J4uzeKP07r2F++Op7E9B29JRUy+a27EibtBTQ=
+golang.org/x/sys v0.44.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
+golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.37.0 h1:Cqjiwd9eSg8e0QAkyCaQTNHFIIzWtidPahFWR83rTrc=
+golang.org/x/text v0.37.0/go.mod h1:a5sjxXGs9hsn/AJVwuElvCAo9v8QYLzvavO5z2PiM38=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
+gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
+google.golang.org/adk v1.3.0 h1:paUr9uM2qANnMUAQ4ydMXMCnM1HtymhDYl8y7gnKvqs=
+google.golang.org/adk v1.3.0/go.mod h1:R8tNFnI/eiBXHn7zJPJtqdiK/WXC+tVkyuZsXyNZXN4=
+google.golang.org/api v0.279.0 h1:hsx2M2OaRcaKtVYK6vXEUnQvdjnend7ZYES+lYaot74=
+google.golang.org/api v0.279.0/go.mod h1:B9TqLBwJqVjp1mtt7WeoQwWRwvu/400y5lETOql+giQ=
+google.golang.org/genai v1.58.0 h1:MNA3ZkRyr7MnRwZ9RNZ60p4+UMKV3yYRw6pyHq4pp0U=
+google.golang.org/genai v1.58.0/go.mod h1:A3kkl0nyBjyFlNjgxIwKq70julKbIxpSxqKO5gw/gmk=
+google.golang.org/genproto v0.0.0-20260319201613-d00831a3d3e7 h1:XzmzkmB14QhVhgnawEVsOn6OFsnpyxNPRY9QV01dNB0=
+google.golang.org/genproto v0.0.0-20260319201613-d00831a3d3e7/go.mod h1:L43LFes82YgSonw6iTXTxXUX1OlULt4AQtkik4ULL/I=
+google.golang.org/genproto/googleapis/api v0.0.0-20260427160629-7cedc36a6bc4 h1:yOzSCGPx+cp5VO7IxvZ9SBFF7j1tZVcNtlHR2iYKtVo=
+google.golang.org/genproto/googleapis/api v0.0.0-20260427160629-7cedc36a6bc4/go.mod h1:Q9HWtNeE7tM9npdIsEvqXj1QJIvVoeAV3rtXtS715Cw=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60 h1:seT2EwLWM78plQ7wcDfuWBc/4FAEAXDDiaSol4ku4qo=
+google.golang.org/genproto/googleapis/rpc v0.0.0-20260511170946-3700d4141b60/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
+google.golang.org/grpc v1.81.0 h1:W3G9N3KQf3BU+YuCtGKJk0CmxQNbAISICD/9AORxLIw=
+google.golang.org/grpc v1.81.0/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I=
+google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
+google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+rsc.io/omap v1.2.0 h1:c1M8jchnHbzmJALzGLclfH3xDWXrPxSUHXzH5C+8Kdw=
+rsc.io/omap v1.2.0/go.mod h1:C8pkI0AWexHopQtZX+qiUeJGzvc8HkdgnsWK4/mAa00=
+rsc.io/ordered v1.1.1 h1:1kZM6RkTmceJgsFH/8DLQvkCVEYomVDJfBRLT595Uak=
+rsc.io/ordered v1.1.1/go.mod h1:evAi8739bWVBRG9aaufsjVc202+6okf8u2QeVL84BCM=
diff --git a/samples/go/agents/deepresearch/infra/.sqlfluff b/samples/go/agents/deepresearch/infra/.sqlfluff
new file mode 100644
index 000000000..020beb038
--- /dev/null
+++ b/samples/go/agents/deepresearch/infra/.sqlfluff
@@ -0,0 +1,3 @@
+[sqlfluff]
+dialect = mysql
+exclude_rules = RF06, LT05
diff --git a/samples/go/agents/deepresearch/infra/docker-compose.yaml b/samples/go/agents/deepresearch/infra/docker-compose.yaml
new file mode 100644
index 000000000..f08a5604e
--- /dev/null
+++ b/samples/go/agents/deepresearch/infra/docker-compose.yaml
@@ -0,0 +1,136 @@
+services:
+ # --- Infrastructure ---
+
+ nats:
+ image: nats:latest
+ container_name: nats
+ command: ["--jetstream", "--store_dir=/data", "--config=/etc/nats/nats.conf"]
+ volumes:
+ - ./nats.conf:/etc/nats/nats.conf:ro
+ ports:
+ - "4222:4222"
+ - "8222:8222"
+
+ nats-init:
+ image: natsio/nats-box:latest
+ container_name: nats-init
+ depends_on:
+ - nats
+ restart: "no"
+ environment:
+ NATS_URL: nats://nats:4222
+ volumes:
+ - ./nats-init.sh:/nats-init.sh:ro
+ entrypoint: ["/bin/sh", "/nats-init.sh"]
+
+ mysql:
+ image: mysql:8.0
+ container_name: mysql
+ environment:
+ MYSQL_ROOT_PASSWORD: root
+ MYSQL_DATABASE: planner
+ volumes:
+ - ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro
+ ports:
+ - "3306:3306"
+
+ # --- Load Balancer ---
+ # Single nginx instance serving as both external and internal load balancer.
+ # External (port 8080): entry point for clients, routes to orchestrator replicas.
+ # Internal (port 80): Host-header routing between services.
+
+ lb:
+ image: nginx:alpine
+ networks:
+ default:
+ aliases:
+ - orchestrator-svc
+ - researcher-svc
+ - analyzer-svc
+ - synthesizer-svc
+ volumes:
+ - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
+ ports:
+ - "8080:8080" # external: clients → orchestrator
+ - "8081:80" # direct access to internal LB
+ depends_on:
+ - researcher
+ - analyzer
+ - synthesizer
+
+ # --- Application Services ---
+
+ orchestrator:
+ build:
+ context: ..
+ dockerfile: Dockerfile
+ environment:
+ NODE_TYPE: orchestrator
+ SERVICE_URL: http://orchestrator-svc
+ NATS_URL: nats://nats:4222
+ MYSQL_DSN: root:root@tcp(mysql:3306)/planner?parseTime=true
+ GOOGLE_API_KEY: ${GOOGLE_API_KEY}
+ REPORT_URL: ${REPORT_URL}
+ deploy:
+ replicas: 2
+ depends_on:
+ nats-init:
+ condition: service_completed_successfully
+ mysql:
+ condition: service_started
+ lb:
+ condition: service_started
+
+ researcher:
+ build:
+ context: ..
+ dockerfile: Dockerfile
+ environment:
+ NODE_TYPE: researcher
+ SERVICE_URL: http://researcher
+ NATS_URL: nats://nats:4222
+ MYSQL_DSN: root:root@tcp(mysql:3306)/planner?parseTime=true
+ GOOGLE_API_KEY: ${GOOGLE_API_KEY}
+ deploy:
+ replicas: 3
+ depends_on:
+ nats-init:
+ condition: service_completed_successfully
+ mysql:
+ condition: service_started
+
+ analyzer:
+ build:
+ context: ..
+ dockerfile: Dockerfile
+ environment:
+ NODE_TYPE: analyzer
+ SERVICE_URL: http://analyzer
+ NATS_URL: nats://nats:4222
+ MYSQL_DSN: root:root@tcp(mysql:3306)/planner?parseTime=true
+ GOOGLE_API_KEY: ${GOOGLE_API_KEY}
+ deploy:
+ replicas: 2
+ depends_on:
+ nats-init:
+ condition: service_completed_successfully
+ mysql:
+ condition: service_started
+
+ synthesizer:
+ build:
+ context: ..
+ dockerfile: Dockerfile
+ environment:
+ NODE_TYPE: synthesizer
+ SERVICE_URL: http://synthesizer
+ NATS_URL: nats://nats:4222
+ MYSQL_DSN: root:root@tcp(mysql:3306)/planner?parseTime=true
+ GOOGLE_API_KEY: ${GOOGLE_API_KEY}
+ deploy:
+ replicas: 2
+ depends_on:
+ nats-init:
+ condition: service_completed_successfully
+ mysql:
+ condition: service_started
diff --git a/samples/go/agents/deepresearch/infra/init.sql b/samples/go/agents/deepresearch/infra/init.sql
new file mode 100644
index 000000000..c8173ab60
--- /dev/null
+++ b/samples/go/agents/deepresearch/infra/init.sql
@@ -0,0 +1,25 @@
+CREATE TABLE IF NOT EXISTS `tasks` (
+ `task_id` CHAR(36) PRIMARY KEY,
+ `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ `updated_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ `user` VARCHAR(255) NOT NULL DEFAULT '',
+ `agent` VARCHAR(255) NOT NULL DEFAULT '',
+ `context_id` VARCHAR(255) NOT NULL DEFAULT '',
+ `state` VARCHAR(32) NOT NULL DEFAULT 'submitted',
+ `version` BIGINT NOT NULL DEFAULT 1
+) ENGINE = InnoDB;
+
+CREATE INDEX `idx_tasks_state_created` ON `tasks` (`state`, `updated_at`);
+CREATE INDEX `idx_tasks_context_created` ON `tasks` (`context_id`, `created_at`);
+CREATE INDEX `idx_tasks_user_created` ON `tasks` (`user`, `created_at`);
+CREATE INDEX `idx_tasks_agent_created` ON `tasks` (`agent`, `created_at`);
+
+CREATE TABLE IF NOT EXISTS `outbox` (
+ `id` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ `task_id` CHAR(36) NOT NULL,
+ `agent` VARCHAR(255) NOT NULL,
+ `event_data` TEXT NOT NULL
+) ENGINE = InnoDB;
+
+CREATE INDEX `idx_outbox_agent` ON `outbox` (`agent`, `id`);
diff --git a/samples/go/agents/deepresearch/infra/nats-init.sh b/samples/go/agents/deepresearch/infra/nats-init.sh
new file mode 100755
index 000000000..a0dcbbdb8
--- /dev/null
+++ b/samples/go/agents/deepresearch/infra/nats-init.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env sh
+# Creates NATS JetStream streams, consumers, and KV buckets required by the
+# deep research agent cluster. Intended to run inside the natsio/nats-box
+# container or any environment with the `nats` CLI available.
+#
+# The script is idempotent: it deletes existing resources before recreating
+# them so that re-running after a config change does not fail.
+#
+# Usage:
+# NATS_URL=nats://nats:4222 ./nats-init.sh
+
+set -eu
+
+NATS_URL="${NATS_URL:-nats://localhost:4222}"
+
+echo "Waiting for NATS at ${NATS_URL}..."
+sleep 2
+
+# --- Clean up previous state ---
+
+nats -s "${NATS_URL}" stream rm EVENTS -f 2>/dev/null || true
+nats -s "${NATS_URL}" stream rm WORK -f 2>/dev/null || true
+nats -s "${NATS_URL}" stream rm STATES -f 2>/dev/null || true
+nats -s "${NATS_URL}" kv rm OUTBOX_LOCK -f 2>/dev/null || true
+
+# --- Streams ---
+
+# Event log — durable, per-task subject.
+nats -s "${NATS_URL}" stream add EVENTS \
+ --subjects="events.>" \
+ --retention=limits --storage=file --discard=old \
+ --defaults
+
+# Work queue — per-node-type subjects with filtered consumers.
+nats -s "${NATS_URL}" stream add WORK \
+ --subjects="work.>" \
+ --retention=work --storage=file --discard=old \
+ --defaults
+
+# Push notifications — ephemeral signaling stream with TTL.
+nats -s "${NATS_URL}" stream add STATES \
+ --subjects="states.>" \
+ --retention=limits --max-age=24h --storage=memory --discard=old \
+ --defaults
+
+# --- Consumers (one per agent type, filtered by subject) ---
+
+nats -s "${NATS_URL}" consumer add WORK orchestrator \
+ --filter="work.orchestrator" \
+ --ack=explicit --deliver=all --replay=instant --pull \
+ --defaults
+
+nats -s "${NATS_URL}" consumer add WORK researcher \
+ --filter="work.researcher" \
+ --ack=explicit --deliver=all --replay=instant --pull \
+ --defaults
+
+nats -s "${NATS_URL}" consumer add WORK analyzer \
+ --filter="work.analyzer" \
+ --ack=explicit --deliver=all --replay=instant --pull \
+ --defaults
+
+nats -s "${NATS_URL}" consumer add WORK synthesizer \
+ --filter="work.synthesizer" \
+ --ack=explicit --deliver=all --replay=instant --pull \
+ --defaults
+
+# --- KV buckets ---
+
+# Outbox leader election.
+nats -s "${NATS_URL}" kv add OUTBOX_LOCK \
+ --ttl=10s --storage=memory
+
+echo "NATS streams, consumers, and KV buckets ready."
diff --git a/samples/go/agents/deepresearch/infra/nats.conf b/samples/go/agents/deepresearch/infra/nats.conf
new file mode 100644
index 000000000..ae7fd45f1
--- /dev/null
+++ b/samples/go/agents/deepresearch/infra/nats.conf
@@ -0,0 +1 @@
+jetstream {}
diff --git a/samples/go/agents/deepresearch/infra/nginx.conf b/samples/go/agents/deepresearch/infra/nginx.conf
new file mode 100644
index 000000000..3b86e5828
--- /dev/null
+++ b/samples/go/agents/deepresearch/infra/nginx.conf
@@ -0,0 +1,75 @@
+resolver 127.0.0.11 valid=10s;
+
+# Entry point for local external clients. Routes all traffic to orchestrator replicas.
+
+server {
+ listen 8080;
+
+ location / {
+ set $backend orchestrator;
+ proxy_pass http://$backend:8080;
+ proxy_http_version 1.1;
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header Connection "";
+ }
+}
+
+# --- Internal load balancer (port 80) ---
+# Routes inter-service traffic by Host header to the appropriate replicas.
+
+server {
+ listen 80;
+ server_name orchestrator-svc;
+
+ location / {
+ set $backend orchestrator;
+ proxy_pass http://$backend:8080;
+ proxy_http_version 1.1;
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header Connection "";
+ }
+}
+
+server {
+ listen 80;
+ server_name researcher-svc;
+
+ location / {
+ set $backend researcher;
+ proxy_pass http://$backend:8080;
+ proxy_http_version 1.1;
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header Connection "";
+ }
+}
+
+server {
+ listen 80;
+ server_name analyzer-svc;
+
+ location / {
+ set $backend analyzer;
+ proxy_pass http://$backend:8080;
+ proxy_http_version 1.1;
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header Connection "";
+ }
+}
+
+server {
+ listen 80;
+ server_name synthesizer-svc;
+
+ location / {
+ set $backend synthesizer;
+ proxy_pass http://$backend:8080;
+ proxy_http_version 1.1;
+ proxy_set_header Host $host;
+ proxy_set_header X-Real-IP $remote_addr;
+ proxy_set_header Connection "";
+ }
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/analyzer.go b/samples/go/agents/deepresearch/internal/agents/analyzer.go
new file mode 100644
index 000000000..c426092d4
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/analyzer.go
@@ -0,0 +1,30 @@
+package agents
+
+import (
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+
+ "google.golang.org/adk/agent/llmagent"
+ "google.golang.org/adk/model"
+)
+
+// NewAnalyzer creates an analyzer agent that reviews research findings for contradictions and gaps.
+func NewAnalyzer(tl TaskLoader, model model.LLM) (a2asrv.AgentExecutor, error) {
+ a, err := llmagent.New(llmagent.Config{
+ Name: "analyzer",
+ Model: model,
+ Description: "Analyzes research findings for contradictions, gaps, and areas needing follow-up.",
+ Instruction: `You are a critical research analyst. You will receive a set of research findings from multiple independent research tasks.
+
+Your job is to:
+1. Identify contradictions — places where sources or findings disagree on facts, figures, or conclusions.
+2. Find gaps — important aspects of the topic that the research did not cover or only mentioned superficially.
+3. Assess source quality — note where findings rely on weak, outdated, or potentially biased sources.
+4. Suggest follow-up questions — for each gap or contradiction, propose a specific research question that would resolve it.
+
+Be specific: reference the exact claims that conflict, not vague generalities. Output a structured analysis with sections for contradictions, gaps, and follow-up questions.`,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &referencedTaskLoader{loader: tl, AgentExecutor: newExecutorFrom(a)}, nil
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/common.go b/samples/go/agents/deepresearch/internal/agents/common.go
new file mode 100644
index 000000000..8f07a8b77
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/common.go
@@ -0,0 +1,80 @@
+package agents
+
+import (
+ "context"
+ "fmt"
+ "iter"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+ "github.com/a2aproject/a2a-go/v2/log"
+
+ "google.golang.org/adk/agent"
+ "google.golang.org/adk/runner"
+ "google.golang.org/adk/server/adka2a/v2"
+ "google.golang.org/adk/session"
+ "google.golang.org/genai"
+)
+
+func newExecutorFrom(a agent.Agent) a2asrv.AgentExecutor {
+ return adka2a.NewExecutor(adka2a.ExecutorConfig{
+ RunnerConfig: runner.Config{Agent: a, AppName: a.Name(), SessionService: session.InMemoryService()},
+ RunConfig: agent.RunConfig{StreamingMode: agent.StreamingModeSSE},
+ BeforeExecuteCallback: func(ctx context.Context, _ *a2asrv.ExecutorContext) (context.Context, error) {
+ log.Info(ctx, "agent invoked", "name", a.Name())
+ return ctx, nil
+ },
+ AfterEventCallback: func(ctx adka2a.ExecutorContext, _ *session.Event, processed *a2a.TaskArtifactUpdateEvent) error {
+ if processed.LastChunk && len(ctx.RequestContext().Message.Parts) > 0 {
+ description := fmt.Sprintf("Research results: %q", ctx.RequestContext().Message.Parts[0].Text())
+ processed.Artifact.Description = description
+ }
+ return nil
+ },
+ AfterExecuteCallback: func(ctx adka2a.ExecutorContext, finalEvent *a2a.TaskStatusUpdateEvent, _ error) error {
+ log.Info(ctx, "agent finished", "name", a.Name(), "status", finalEvent.Status.State)
+ return nil
+ },
+ GenAIPartConverter: func(_ context.Context, adkEvent *session.Event, part *genai.Part) (*a2a.Part, error) {
+ if part.Text == "" || part.Thought { // only expose text outputs
+ return nil, nil
+ }
+ return adka2a.ToA2APart(part, adkEvent.LongRunningToolIDs)
+ },
+ })
+}
+
+// TaskLoader loads completed tasks by their IDs. Used to inject referenced task content into agent prompts.
+type TaskLoader interface {
+ Load(context.Context, []a2a.TaskID) ([]*a2a.Task, error)
+}
+
+type referencedTaskLoader struct {
+ a2asrv.AgentExecutor
+ loader TaskLoader
+}
+
+// Execute implements [a2asrv.AgentExecutor.Execute]. It preloads referenced tasks and add the data
+// to [a2a.Message] contents before delegating to the actual [a2asrv.AgentExecutor].
+func (e *referencedTaskLoader) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
+ return func(yield func(a2a.Event, error) bool) {
+ log.Info(ctx, "task loader", "tasks", len(execCtx.Message.ReferenceTasks))
+
+ tasks, err := e.loader.Load(ctx, execCtx.Message.ReferenceTasks)
+ if err != nil {
+ yield(nil, err)
+ return
+ }
+ for _, task := range tasks {
+ for _, artifact := range task.Artifacts {
+ execCtx.Message.Parts = append(execCtx.Message.Parts, a2a.NewTextPart(artifact.Description+":\n"))
+ execCtx.Message.Parts = append(execCtx.Message.Parts, artifact.Parts...)
+ }
+ }
+ for ev, err := range e.AgentExecutor.Execute(ctx, execCtx) {
+ if !yield(ev, err) {
+ break
+ }
+ }
+ }
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/orchestrator.go b/samples/go/agents/deepresearch/internal/agents/orchestrator.go
new file mode 100644
index 000000000..4aa1601db
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/orchestrator.go
@@ -0,0 +1,288 @@
+package agents
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "iter"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+ "github.com/a2aproject/a2a-go/v2/log"
+ "github.com/google/uuid"
+ "github.com/nats-io/nats.go/jetstream"
+
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/cluster"
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/msgstream"
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/statemachine"
+
+ "google.golang.org/adk/model"
+ "google.golang.org/adk/server/adka2a/v2"
+ "google.golang.org/genai"
+)
+
+const (
+ statesStream = "STATES"
+ statesSubject = "states"
+)
+
+// OrchestratorConfig configures an orchestrator agent.
+type OrchestratorConfig struct {
+ JS jetstream.JetStream
+ ReportStore a2a.URL
+ Researcher *cluster.Client
+ Analyzer *cluster.Client
+ Synthesizer *cluster.Client
+ Model model.LLM
+}
+
+// CreateOrchestrator creates and returns the orchestrator agent executor.
+func CreateOrchestrator(ctx context.Context, cfg OrchestratorConfig) (a2asrv.AgentExecutor, error) {
+ stream, err := cfg.JS.Stream(ctx, statesStream)
+ if err != nil {
+ return nil, fmt.Errorf("states stream init failed: %w", err)
+ }
+ return &orchestrator{OrchestratorConfig: cfg, stream: stream}, nil
+}
+
+type orchestrator struct {
+ OrchestratorConfig
+ stream jetstream.Stream
+}
+
+// Execute implements [a2asrv.AgentExecutor.Execute].
+func (o *orchestrator) Execute(ctx context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
+ return func(yield func(a2a.Event, error) bool) {
+ exec := &orchestratorRun{
+ execCtx: execCtx,
+ model: o.Model,
+ jetstream: o.JS,
+ subject: statesSubject + "." + string(execCtx.TaskID),
+ state: &orchestratorState{},
+ yield: func(e a2a.Event, err error) bool {
+ log.Info(ctx, "orcherstrator yield", "type", fmt.Sprintf("%T", e), "error", err)
+ return yield(e, err)
+ },
+ }
+
+ if !exec.yield(a2a.NewSubmittedTask(execCtx, execCtx.Message), nil) {
+ return
+ }
+
+ err := statemachine.Run(ctx, o.stream, statemachine.Spec[*orchestratorEvent, *orchestratorState]{
+ Subject: exec.subject,
+ State: exec.state,
+ Decode: parseOrchestratorEvent,
+ Evolve: evolveOrchestratorState,
+ Act: func(ctx context.Context, s *orchestratorState, _ []*orchestratorEvent) error {
+ stage := s.activeStage()
+ switch {
+ case stage == nil: // initial state
+ return o.research(ctx, exec, nil)
+
+ case stage.messageCommit == nil: // crash recovery
+ return o.recoverFromFailure(ctx, exec, stage)
+
+ case !stage.finished(): // wait for tasks to complete
+ return nil
+
+ default:
+ return o.runNextStage(ctx, exec, stage)
+ }
+ },
+ })
+ if err != nil {
+ if !errors.Is(err, statemachine.ErrStopped) {
+ exec.yield(nil, err)
+ }
+ }
+ }
+}
+
+// Cancel implements [a2asrv.AgentExecutor.Cancel].
+func (o *orchestrator) Cancel(_ context.Context, execCtx *a2asrv.ExecutorContext) iter.Seq2[a2a.Event, error] {
+ return func(yield func(a2a.Event, error) bool) {
+ yield(a2a.NewStatusUpdateEvent(
+ execCtx,
+ a2a.TaskStateCanceled,
+ a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart("cancelled")),
+ ), nil)
+ }
+}
+
+// recoverFromFailure is called for a stage with messageCommit equal to nil. This can happen when either
+// a message sneding or messageCommit publishing failed and executor got restarted.
+func (o *orchestrator) recoverFromFailure(ctx context.Context, exec *orchestratorRun, stage *deepresearchStage) error {
+ if sendMaybeFailed := len(stage.tasks) < len(stage.message.Messages); sendMaybeFailed {
+ return o.runNextStage(ctx, exec, exec.state.previousStage(stage))
+ }
+ allDone := false
+ for _, state := range stage.tasks {
+ if !state.Terminal() {
+ allDone = false
+ break
+ }
+ }
+ if allDone {
+ return o.runNextStage(ctx, exec, stage)
+ }
+ return nil // wait for task completions
+}
+
+func (o *orchestrator) runNextStage(ctx context.Context, exec *orchestratorRun, stage *deepresearchStage) error {
+ switch stage.message.Type {
+ case stageResearch:
+ if stage.message.PrevStageID == "" { // analyze initial research findings
+ return o.analyze(ctx, exec, stage)
+ }
+ // follow-up research finished, synthesize all findings
+ return o.synthesize(ctx, exec)
+ case stageAnalysis: // start follow-up research
+ return o.research(ctx, exec, stage)
+ case stageSynthesiz: // deliver final result
+ return o.complete(ctx, exec, stage)
+ default:
+ return fmt.Errorf("unknown uncommitted stage %q", stage.message.Type)
+ }
+}
+
+func (o *orchestrator) research(ctx context.Context, e *orchestratorRun, prevStage *deepresearchStage) error {
+ if !e.updateStatus("Planning research...") {
+ return statemachine.ErrStopped
+ }
+
+ var parts []*a2a.Part
+ if prevStage == nil {
+ parts = e.execCtx.Message.Parts
+ } else {
+ prevTaskID, err := prevStage.taskID()
+ if err != nil {
+ return err
+ }
+ aParts, err := o.Analyzer.GetArtifactParts(ctx, prevTaskID)
+ if err != nil {
+ return err
+ }
+ parts = aParts
+ }
+
+ converted, err := adka2a.ToGenAIParts(parts)
+ if err != nil {
+ return fmt.Errorf("plan input conversion: %w", err)
+ }
+
+ plan, err := runPlanner(ctx, e.model, genai.NewContentFromParts(converted, genai.RoleUser))
+ if err != nil {
+ return fmt.Errorf("planner: %w", err)
+ }
+ if len(plan.Subtasks) == 0 {
+ return fmt.Errorf("planner returned 0 subtasks")
+ }
+
+ if !e.yieldArtifact(a2a.NewTextPart(plan.Summary)) {
+ return statemachine.ErrStopped
+ }
+
+ messages := make([]*a2a.Message, len(plan.Subtasks))
+ for i, st := range plan.Subtasks {
+ messages[i] = a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart(st))
+ }
+
+ var prevStageID string
+ if prevStage != nil {
+ prevStageID = prevStage.id
+ }
+ if err := e.send(ctx, o.Researcher, stageResearch, prevStageID, messages); err != nil {
+ return fmt.Errorf("research failed: %w", err)
+ }
+ for _, subtask := range plan.Subtasks {
+ if !e.updateStatus(fmt.Sprintf("Researching %q", subtask)) {
+ return statemachine.ErrStopped
+ }
+ }
+ return nil
+}
+
+func (o *orchestrator) analyze(ctx context.Context, e *orchestratorRun, prevStage *deepresearchStage) error {
+ if !e.updateStatus("Analyzing findings...") {
+ return statemachine.ErrStopped
+ }
+ message := a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("Find contradictions and controversial parts in these research findings."))
+ for tid := range prevStage.tasks {
+ message.ReferenceTasks = append(message.ReferenceTasks, tid)
+ }
+ return e.send(ctx, o.Analyzer, stageAnalysis, prevStage.id, []*a2a.Message{message})
+}
+
+func (o *orchestrator) synthesize(ctx context.Context, e *orchestratorRun) error {
+ if !e.updateStatus("Synthesizing final report...") {
+ return statemachine.ErrStopped
+ }
+ message := a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("Synthesize all the research findings into the final report."))
+ for stage := e.state.activeStage(); stage != nil; stage = e.state.previousStage(stage) {
+ if stage.message.Type == stageResearch {
+ for tid := range stage.tasks {
+ message.ReferenceTasks = append(message.ReferenceTasks, tid)
+ }
+ }
+ }
+ return e.send(ctx, o.Synthesizer, stageSynthesiz, e.state.activeStage().id, []*a2a.Message{message})
+}
+
+func (o *orchestrator) complete(_ context.Context, e *orchestratorRun, stage *deepresearchStage) error {
+ reportID, err := stage.taskID()
+ if err != nil {
+ return fmt.Errorf("report not ready: %w", err)
+ }
+ url := o.ReportStore + a2a.URL("/reports/"+reportID)
+ if !e.yieldArtifact(a2a.NewTextPart("Your report is ready for review."), a2a.NewFileURLPart(url, "text/html")) {
+ return statemachine.ErrStopped
+ }
+ e.complete()
+ return statemachine.ErrStopped
+}
+
+type orchestratorRun struct {
+ model model.LLM
+
+ execCtx *a2asrv.ExecutorContext
+ yield func(a2a.Event, error) bool
+
+ jetstream jetstream.JetStream
+ subject string
+
+ state *orchestratorState
+}
+
+func (r *orchestratorRun) send(ctx context.Context, client *cluster.Client, st stageType, prevStageID string, messages []*a2a.Message) error {
+ stageID := uuid.NewString()
+ prepareEvent := &orchestratorEvent{StageID: stageID, MessagePrepare: &messagePrepare{Type: st, Messages: messages, PrevStageID: prevStageID}}
+ if err := msgstream.PublishJSON(ctx, r.jetstream, r.subject, prepareEvent); err != nil {
+ return fmt.Errorf("prepare publish: %w", err)
+ }
+ taskIDs, err := client.SendAll(ctx, r.execCtx, messages, msgstream.NewPushConfig(r.subject, stageID))
+ if err != nil {
+ return fmt.Errorf("send all: %w", err)
+ }
+ commitEvent := &orchestratorEvent{StageID: stageID, MessageCommit: &messageCommit{TaskIDs: taskIDs}}
+ if err := msgstream.PublishJSON(ctx, r.jetstream, r.subject, commitEvent); err != nil {
+ return fmt.Errorf("commit publish: %w", err)
+ }
+ return nil
+}
+
+func (r *orchestratorRun) updateStatus(text string) bool {
+ return r.yield(a2a.NewStatusUpdateEvent(
+ r.execCtx, a2a.TaskStateWorking, a2a.NewMessage(a2a.MessageRoleAgent, a2a.NewTextPart(text)),
+ ), nil)
+}
+
+func (r *orchestratorRun) yieldArtifact(parts ...*a2a.Part) bool {
+ artifact := a2a.NewArtifactEvent(r.execCtx, parts...)
+ artifact.LastChunk = true
+ return r.yield(artifact, nil)
+}
+
+func (r *orchestratorRun) complete() {
+ _ = r.yield(a2a.NewStatusUpdateEvent(r.execCtx, a2a.TaskStateCompleted, nil), nil)
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/orchestrator_state.go b/samples/go/agents/deepresearch/internal/agents/orchestrator_state.go
new file mode 100644
index 000000000..c64cc796d
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/orchestrator_state.go
@@ -0,0 +1,128 @@
+package agents
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "slices"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+type stageType string
+
+const (
+ stageResearch stageType = "planning"
+ stageAnalysis stageType = "analysis"
+ stageSynthesiz stageType = "synthesiz"
+)
+
+type messagePrepare struct {
+ Type stageType `json:"type"`
+ Messages []*a2a.Message `json:"messages"`
+ PrevStageID string `json:"prevStageId"`
+}
+
+type messageCommit struct {
+ TaskIDs []a2a.TaskID `json:"taskIds"`
+}
+
+type orchestratorEvent struct {
+ StageID string `json:"stageId"`
+ StatusUpdate *a2a.TaskStatusUpdateEvent `json:"statusUpdate,omitempty"`
+ MessagePrepare *messagePrepare `json:"messagePrepare,omitempty"`
+ MessageCommit *messageCommit `json:"messageCommit,omitempty"`
+}
+
+func parseOrchestratorEvent(_ context.Context, msg jetstream.Msg) (*orchestratorEvent, error) {
+ var event orchestratorEvent
+ if err := json.Unmarshal(msg.Data(), &event); err != nil {
+ return nil, err
+ }
+ if event.StatusUpdate != nil {
+ event.StageID = msg.Headers().Get("A2A-Token")
+ }
+ return &event, nil
+}
+
+type deepresearchStage struct {
+ id string
+ message *messagePrepare
+ messageCommit *messageCommit
+ tasks map[a2a.TaskID]a2a.TaskState
+}
+
+func (p *deepresearchStage) taskID() (a2a.TaskID, error) {
+ if len(p.tasks) != 1 {
+ return "", fmt.Errorf("taskID() is only valid for single-task stages (analysis, synthesis)")
+ }
+ for tid := range p.tasks {
+ return tid, nil
+ }
+ return "", nil
+}
+
+func (p *deepresearchStage) finished() bool {
+ if len(p.tasks) < len(p.message.Messages) {
+ return false
+ }
+ for _, state := range p.tasks {
+ if !state.Terminal() {
+ return false
+ }
+ }
+ return true
+}
+
+type orchestratorState struct {
+ stages []*deepresearchStage
+}
+
+func (s *orchestratorState) previousStage(stage *deepresearchStage) *deepresearchStage {
+ if stage == nil || stage.message.PrevStageID == "" {
+ return nil
+ }
+ si := slices.IndexFunc(s.stages, func(ds *deepresearchStage) bool {
+ return ds.id == stage.message.PrevStageID
+ })
+ return s.stages[si]
+}
+
+func (s *orchestratorState) activeStage() *deepresearchStage {
+ if len(s.stages) == 0 {
+ return nil
+ }
+ return s.stages[len(s.stages)-1]
+}
+
+func evolveOrchestratorState(_ context.Context, s *orchestratorState, event *orchestratorEvent) error {
+ if event.MessagePrepare != nil {
+ s.stages = append(s.stages, &deepresearchStage{
+ id: event.StageID,
+ message: event.MessagePrepare,
+ tasks: make(map[a2a.TaskID]a2a.TaskState),
+ })
+ return nil
+ }
+
+ stageIndex := slices.IndexFunc(s.stages, func(stage *deepresearchStage) bool {
+ return stage.id == event.StageID
+ })
+ if stageIndex < 0 {
+ return fmt.Errorf("event for unknown stage %q", event.StageID)
+ }
+
+ if event.MessageCommit != nil {
+ s.stages[stageIndex].messageCommit = event.MessageCommit
+ return nil
+ }
+
+ tu := event.StatusUpdate
+ stage := s.stages[stageIndex]
+ if _, ok := stage.tasks[tu.TaskID]; !ok && len(stage.tasks) >= len(stage.message.Messages) {
+ return fmt.Errorf("more tasks than messages for stage %q", event.StageID)
+ }
+ stage.tasks[tu.TaskID] = tu.Status.State
+ return nil
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/planner.go b/samples/go/agents/deepresearch/internal/agents/planner.go
new file mode 100644
index 000000000..5a5a482b4
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/planner.go
@@ -0,0 +1,87 @@
+package agents
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+
+ "google.golang.org/adk/agent"
+ "google.golang.org/adk/agent/llmagent"
+ "google.golang.org/adk/model"
+ "google.golang.org/adk/runner"
+ "google.golang.org/adk/session"
+ "google.golang.org/genai"
+)
+
+type researchPlan struct {
+ Summary string `json:"summary"`
+ Subtasks []string `json:"subtasks"`
+}
+
+var researchPlanSchema = &genai.Schema{
+ Type: genai.TypeObject,
+ Properties: map[string]*genai.Schema{
+ "summary": {Type: genai.TypeString, Description: "Brief summary of the research plan."},
+ "subtasks": {
+ Type: genai.TypeArray,
+ Items: &genai.Schema{Type: genai.TypeString},
+ Description: "Focused subtasks that can be researched independently.",
+ },
+ },
+ Required: []string{"summary", "subtasks"},
+}
+
+func runPlanner(ctx context.Context, model model.LLM, content *genai.Content) (*researchPlan, error) {
+ a, err := llmagent.New(llmagent.Config{
+ Name: "planner",
+ Description: "Deep research planner agent.",
+ Model: model,
+ Instruction: `You are the planning component of a multi-agent deep research system.
+
+## How the system works
+- Each subtask you produce is sent to a separate researcher agent that uses Google Search.
+- Researchers work independently and cannot see each other's subtasks or results.
+- After initial research, an analyzer checks for contradictions and gaps, and you may be called again with the analysis to plan targeted follow-up research.
+
+## Planning rules
+1. Produce 3–5 subtasks. Fewer for narrow topics, more for broad ones.
+2. Each subtask must be self-contained — include enough context that a researcher can investigate it without seeing the original question or other subtasks.
+3. Subtasks must not overlap — do not assign the same ground to multiple researchers.
+4. Frame each subtask as a clear, specific research question or directive, not a vague topic label.
+5. Prefer subtasks that target publicly available, searchable information.
+
+## When handling follow-up research
+If the input contains analysis of prior findings (contradictions, gaps, or open questions), focus subtasks on resolving those specific issues. Do not re-research topics already well-covered.
+
+## Summary
+Write a brief, user-facing summary (1–2 sentences) describing what the plan covers.`,
+ OutputSchema: researchPlanSchema,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("llm create failed: %w", err)
+ }
+ sessionSvc := session.InMemoryService()
+ r, err := runner.New(runner.Config{AppName: a.Name(), Agent: a, SessionService: sessionSvc})
+ if err != nil {
+ return nil, fmt.Errorf("runner create failed: %w", err)
+ }
+ sess, err := sessionSvc.Create(ctx, &session.CreateRequest{AppName: a.Name(), UserID: "user"})
+ if err != nil {
+ return nil, fmt.Errorf("session create failed: %w", err)
+ }
+ var event *session.Event
+ for ev, err := range r.Run(ctx, "user", sess.Session.ID(), content, agent.RunConfig{}) {
+ if err != nil {
+ return nil, err
+ }
+ event = ev
+ }
+ if event == nil || event.Content == nil || len(event.Content.Parts) == 0 {
+ return nil, fmt.Errorf("no content returned from planner")
+ }
+ var plan researchPlan
+ if err := json.Unmarshal([]byte(event.Content.Parts[0].Text), &plan); err != nil {
+ return nil, err
+ }
+ return &plan, nil
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/researcher.go b/samples/go/agents/deepresearch/internal/agents/researcher.go
new file mode 100644
index 000000000..cae393d2a
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/researcher.go
@@ -0,0 +1,32 @@
+package agents
+
+import (
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+
+ "google.golang.org/adk/agent/llmagent"
+ "google.golang.org/adk/model"
+ "google.golang.org/adk/tool"
+ "google.golang.org/adk/tool/geminitool"
+)
+
+// NewResearcher creates a researcher agent that uses Google Search to investigate subtopics.
+func NewResearcher(model model.LLM) (a2asrv.AgentExecutor, error) {
+ a, err := llmagent.New(llmagent.Config{
+ Name: "researcher",
+ Model: model,
+ Description: "Researches a focused subtopic using Google Search and produces a detailed, sourced report.",
+ Instruction: `You are an expert research analyst. You will receive a focused research subtask.
+
+Use Google Search to find authoritative, up-to-date information. For each claim or finding:
+- Cite the source URL.
+- Note the publication date when available.
+- Prefer primary sources (official reports, peer-reviewed papers, authoritative organizations) over secondary coverage.
+
+Structure your output as a coherent report with clear sections. Flag any conflicting information you encounter across sources. If a subtask is ambiguous, state your interpretation before proceeding.`,
+ Tools: []tool.Tool{geminitool.GoogleSearch{}},
+ })
+ if err != nil {
+ return nil, err
+ }
+ return newExecutorFrom(a), nil
+}
diff --git a/samples/go/agents/deepresearch/internal/agents/synthesizer.go b/samples/go/agents/deepresearch/internal/agents/synthesizer.go
new file mode 100644
index 000000000..3c49d2c44
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/agents/synthesizer.go
@@ -0,0 +1,31 @@
+package agents
+
+import (
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+
+ "google.golang.org/adk/agent/llmagent"
+ "google.golang.org/adk/model"
+)
+
+// NewSynthesizer creates a synthesizer agent that merges research findings into a final report.
+func NewSynthesizer(tl TaskLoader, model model.LLM) (a2asrv.AgentExecutor, error) {
+ a, err := llmagent.New(llmagent.Config{
+ Name: "synthesizer",
+ Model: model,
+ Description: "Synthesizes research findings into a comprehensive, well-structured final report.",
+ Instruction: `You are an expert research writer. You will receive findings from multiple research tasks covering different aspects of a topic.
+
+Produce a single, comprehensive report that:
+1. Opens with an executive summary of key findings.
+2. Organizes the body into logical thematic sections, not by source.
+3. Reconciles conflicting information — when sources disagree, present both sides and state which is better supported and why.
+4. Cites sources inline using the URLs from the research findings.
+5. Closes with a conclusion that highlights the most important takeaways and any remaining open questions.
+
+Write in a clear, professional tone. Avoid redundancy — do not repeat the same finding from multiple sources. Prefer depth over breadth.`,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &referencedTaskLoader{loader: tl, AgentExecutor: newExecutorFrom(a)}, nil
+}
diff --git a/samples/go/agents/deepresearch/internal/cluster/client.go b/samples/go/agents/deepresearch/internal/cluster/client.go
new file mode 100644
index 000000000..896953b05
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/cluster/client.go
@@ -0,0 +1,88 @@
+package cluster
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/a2aclient"
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+ "github.com/a2aproject/a2a-go/v2/log"
+)
+
+// Client wraps [a2aclient.Client] to simplify app-specific operations.
+type Client struct {
+ Client *a2aclient.Client
+ card *a2a.AgentCard
+}
+
+// CreateClient creates a new [Client] client from the given agent endpoint URL.
+func CreateClient(ctx context.Context, endpoint string) (*Client, error) {
+ iface := a2a.NewAgentInterface(endpoint, a2a.TransportProtocolHTTPJSON)
+ client, err := a2aclient.NewFromEndpoints(ctx, []*a2a.AgentInterface{iface})
+ if err != nil {
+ return nil, err
+ }
+ card, err := client.GetExtendedAgentCard(ctx, &a2a.GetExtendedAgentCardRequest{})
+ if err != nil {
+ return nil, err
+ }
+ return &Client{Client: client, card: card}, nil
+}
+
+// GetArtifactParts fetches a task and returns all of its artifacts as a single [a2a.Part] slice.
+func (s *Client) GetArtifactParts(ctx context.Context, tid a2a.TaskID) ([]*a2a.Part, error) {
+ task, err := s.Client.GetTask(ctx, &a2a.GetTaskRequest{ID: tid})
+ if err != nil {
+ return nil, err
+ }
+ parts := []*a2a.Part{}
+ for _, a := range task.Artifacts {
+ for _, p := range a.Parts {
+ parts = append(parts, p)
+ }
+ }
+ return parts, nil
+}
+
+// SendAll dispatches messages concurrently to the downstream service in non-blocking mode. Returns the IDs of all created tasks.
+func (s *Client) SendAll(ctx context.Context, execCtx *a2asrv.ExecutorContext, messages []*a2a.Message, cfg *a2a.PushConfig) ([]a2a.TaskID, error) {
+ var mu sync.Mutex
+ var errs error
+ var subtasks []a2a.TaskID
+
+ var group sync.WaitGroup
+ for _, msg := range messages {
+ group.Go(func() {
+ log.Info(ctx, "dispatching subtask", "parent_id", execCtx.TaskID)
+
+ result, err := s.Client.SendMessage(ctx, &a2a.SendMessageRequest{
+ Message: msg,
+ Config: &a2a.SendMessageConfig{ReturnImmediately: true, PushConfig: cfg},
+ Metadata: map[string]any{"parent_task_id": string(execCtx.TaskID)},
+ })
+ if err != nil {
+ mu.Lock()
+ errs = errors.Join(errs, fmt.Errorf("swarm message send: %w", err))
+ mu.Unlock()
+ return
+ }
+
+ taskID := result.TaskInfo().TaskID
+
+ mu.Lock()
+ subtasks = append(subtasks, taskID)
+ mu.Unlock()
+
+ log.Info(ctx, "subtask dispatched", "task_id", taskID, "target", s.card.Name)
+ })
+ }
+ group.Wait()
+
+ if errs != nil {
+ return nil, errs
+ }
+ return subtasks, nil
+}
diff --git a/samples/go/agents/deepresearch/internal/domain/domain.go b/samples/go/agents/deepresearch/internal/domain/domain.go
new file mode 100644
index 000000000..5838128a0
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/domain/domain.go
@@ -0,0 +1,66 @@
+// Package domain defines shared types used across all agent packages.
+package domain
+
+import (
+ "context"
+
+ "github.com/a2aproject/a2a-go/v2/a2asrv"
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/utils"
+)
+
+// AgentType identifies the role of an agent in the deep research system.
+type AgentType string
+
+const (
+ AgentOrchestrator AgentType = "orchestrator"
+ AgentResearcher AgentType = "researcher"
+ AgentAnalyzer AgentType = "analyzer"
+ AgentSynthesizer AgentType = "synthesizer"
+)
+
+type nodeInfoKeyType struct{}
+
+// NodeInfo holds node metadata.
+type NodeInfo struct {
+ Agent AgentType
+}
+
+// NodeInfoFrom extracts [NodeInfo] from the given context.
+func NodeInfoFrom(ctx context.Context) NodeInfo {
+ if ni, ok := ctx.Value(nodeInfoKeyType{}).(NodeInfo); ok {
+ return ni
+ }
+ return NodeInfo{}
+}
+
+// NodeInfoFrom extracts [Info] from the given context.
+func WithNodeInfo(ni NodeInfo) a2asrv.RequestHandlerOption {
+ return a2asrv.WithCallInterceptors(&interceptor{nodeInfo: ni})
+}
+
+type interceptor struct {
+ nodeInfo NodeInfo
+ a2asrv.PassthroughCallInterceptor
+}
+
+// Before implements [a2asrv.CallInterceptor.Before].
+func (i *interceptor) Before(ctx context.Context, _ *a2asrv.CallContext, _ *a2asrv.Request) (context.Context, any, error) {
+ return context.WithValue(ctx, nodeInfoKeyType{}, i.nodeInfo), nil, nil
+}
+
+// ContextCodec implements [a2asrv.ContextCodec].
+type ContextCodec struct{}
+
+// Encode implements [a2asrv.ContextCodec.Encode].
+func (cc *ContextCodec) Encode(ctx context.Context) (map[string]any, error) {
+ return utils.ToMapStruct(NodeInfoFrom(ctx))
+}
+
+// Decode implements [a2asrv.ContextCodec.Decode].
+func (cc *ContextCodec) Decode(ctx context.Context, encoded map[string]any) (context.Context, error) {
+ nodeInfoMap, err := utils.FromMapStruct[NodeInfo](encoded)
+ if err != nil {
+ return nil, err
+ }
+ return context.WithValue(ctx, nodeInfoKeyType{}, nodeInfoMap), nil
+}
diff --git a/samples/go/agents/deepresearch/internal/lease/lease.go b/samples/go/agents/deepresearch/internal/lease/lease.go
new file mode 100644
index 000000000..5613b705b
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/lease/lease.go
@@ -0,0 +1,131 @@
+// Package lease implements distributed leader election using NATS KV.
+package lease
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "slices"
+ "sync"
+
+ "github.com/a2aproject/a2a-go/v2/log"
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+// Manager creates and tracks distributed leases backed by a NATS KV bucket.
+type Manager struct {
+ kv jetstream.KeyValue
+
+ mu sync.Mutex
+ active []*Lease
+}
+
+// CreateManager creates a [Manager] using the given KV bucket configuration.
+func CreateManager(ctx context.Context, js jetstream.JetStream, cfg jetstream.KeyValueConfig) (*Manager, error) {
+ kv, err := js.CreateOrUpdateKeyValue(ctx, cfg)
+ if err != nil {
+ if !errors.Is(err, jetstream.ErrBucketExists) {
+ return nil, fmt.Errorf("create kv bucket error: %w", err)
+ }
+ existingKV, err := js.KeyValue(ctx, cfg.Bucket)
+ if err != nil {
+ return nil, fmt.Errorf("get kv bucket error: %w", err)
+ }
+ kv = existingKV
+ }
+ return &Manager{kv: kv}, nil
+}
+
+// Acquire blocks until the lease for key is obtained. It retries on contention.
+func (lp *Manager) Acquire(ctx context.Context, key string, value string) (*Lease, error) {
+ rawVal := []byte(value)
+ for {
+ rev, err := lp.kv.Create(ctx, key, rawVal)
+ if err != nil && !errors.Is(err, jetstream.ErrKeyExists) {
+ return nil, err
+ }
+
+ if err != nil {
+ log.Info(ctx, "leader key exists, waiting for release")
+ if err := lp.waitKeyDeleted(ctx, key); err != nil {
+ return nil, err
+ }
+ continue
+ }
+
+ lease := &Lease{manager: lp, kv: lp.kv, key: key, rev: rev, value: rawVal}
+
+ lp.mu.Lock()
+ lp.active = append(lp.active, lease)
+ lp.mu.Unlock()
+
+ log.Info(ctx, "lease acquired")
+
+ return lease, nil
+ }
+}
+
+func (lp *Manager) waitKeyDeleted(ctx context.Context, key string) error {
+ watcher, err := lp.kv.Watch(ctx, key)
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if err := watcher.Stop(); err != nil {
+ log.Warn(ctx, "watcher stop failed", "cause", err)
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+
+ case entry, ok := <-watcher.Updates():
+ if !ok {
+ return fmt.Errorf("unexpected watcher stop")
+ }
+ if entry == nil {
+ continue
+ }
+ if entry.Operation() == jetstream.KeyValueDelete || entry.Operation() == jetstream.KeyValuePurge {
+ return nil
+ }
+ }
+ }
+}
+
+// ReleaseAll releases all active leases held by this manager.
+func (lp *Manager) ReleaseAll(ctx context.Context) {
+ lp.mu.Lock()
+ defer lp.mu.Unlock()
+ cleanupCtx := context.WithoutCancel(ctx)
+ for _, lease := range lp.active {
+ if err := lp.kv.Delete(cleanupCtx, lease.key, jetstream.LastRevision(lease.rev)); err != nil {
+ log.Warn(ctx, "lease release failed", err, "node", string(lease.value))
+ }
+ }
+ lp.active = nil
+}
+
+// Lease represents a single acquired distributed lock in NATS KV.
+type Lease struct {
+ manager *Manager
+ kv jetstream.KeyValue
+ key string
+ value []byte
+ rev uint64
+}
+
+// Renew extends the lease by updating the KV entry. Returns an error if the lease was lost.
+func (l *Lease) Renew(ctx context.Context) error {
+ newRev, err := l.kv.Update(ctx, l.key, l.value, l.rev)
+ if err != nil {
+ l.manager.mu.Lock()
+ l.manager.active = slices.DeleteFunc(l.manager.active, func(another *Lease) bool { return l == another })
+ l.manager.mu.Unlock()
+ return fmt.Errorf("lease renewal: %w", err)
+ }
+ l.rev = newRev
+ return nil
+}
diff --git a/samples/go/agents/deepresearch/internal/msgstream/eventqueue.go b/samples/go/agents/deepresearch/internal/msgstream/eventqueue.go
new file mode 100644
index 000000000..a9a3d1ef5
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/msgstream/eventqueue.go
@@ -0,0 +1,136 @@
+// Package msgstream provides NATS JetStream-backed event queues, work queues, and push notification senders.
+package msgstream
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/a2asrv/eventqueue"
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+const (
+ eventsStream = "EVENTS"
+)
+
+type natsEventQueueManager struct {
+ stream jetstream.Stream
+ policy jetstream.DeliverPolicy
+}
+
+var _ eventqueue.Manager = (*natsEventQueueManager)(nil)
+
+// CreateEventReplayManager returns an [eventqueue.Manager] that replays all events from the beginning of the stream.
+func CreateEventReplayManager(ctx context.Context, js jetstream.JetStream) (eventqueue.Manager, error) {
+ stream, err := js.Stream(ctx, eventsStream)
+ if err != nil {
+ return nil, fmt.Errorf("nats events stream: %v", err)
+ }
+ return &natsEventQueueManager{stream, jetstream.DeliverAllPolicy}, nil
+}
+
+// CreateEventQueueManager returns an [eventqueue.Manager] that delivers only new events.
+func CreateEventQueueManager(ctx context.Context, js jetstream.JetStream) (eventqueue.Manager, error) {
+ stream, err := js.Stream(ctx, eventsStream)
+ if err != nil {
+ return nil, fmt.Errorf("nats events stream: %v", err)
+ }
+ return &natsEventQueueManager{stream, jetstream.DeliverNewPolicy}, nil
+}
+
+// CreateReader implements [eventqueue.Manager.CreateReader].
+func (m *natsEventQueueManager) CreateReader(ctx context.Context, taskID a2a.TaskID) (eventqueue.Reader, error) {
+ cons, err := m.stream.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{
+ FilterSubjects: []string{eventsSubject(taskID)},
+ DeliverPolicy: m.policy,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("ordered consumer for %s: %w", taskID, err)
+ }
+
+ msgChan := make(chan jetstream.Msg, 64)
+ var cc jetstream.ConsumeContext
+ cc, err = cons.Consume(func(msg jetstream.Msg) {
+ select {
+ case msgChan <- msg:
+ case <-cc.Closed():
+ }
+ })
+ if err != nil {
+ return nil, fmt.Errorf("consume %s: %w", taskID, err)
+ }
+ return &natsEventReader{msgChan: msgChan, cc: cc}, nil
+}
+
+// CreateWriter implements [eventqueue.Manager.CreateWriter].
+func (m *natsEventQueueManager) CreateWriter(_ context.Context, _ a2a.TaskID) (eventqueue.Writer, error) {
+ return natsNoOpWriter{}, nil
+}
+
+// Destroy implements [eventqueue.Manager.Destroy].
+func (m *natsEventQueueManager) Destroy(_ context.Context, _ a2a.TaskID) error {
+ return nil
+}
+
+type natsEventReader struct {
+ msgChan chan jetstream.Msg
+ cc jetstream.ConsumeContext
+}
+
+// Read implements [eventqueue.Reader.Read].
+func (r *natsEventReader) Read(ctx context.Context) (*eventqueue.Message, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+
+ case natsMsg, ok := <-r.msgChan:
+ if !ok {
+ return nil, eventqueue.ErrQueueClosed
+ }
+ var msg eventqueue.Message
+ if err := json.Unmarshal(natsMsg.Data(), &msg); err != nil {
+ return nil, fmt.Errorf("message parsing: %w", err)
+ }
+ return &msg, nil
+ }
+}
+
+// Close implements [eventqueue.Reader.Close].
+func (r *natsEventReader) Close() error {
+ r.cc.Stop()
+ return nil
+}
+
+// natsNoOpWriter is a no-op because events are written through the task store.
+type natsNoOpWriter struct{}
+
+// Write implements [eventqueue.Writer.Write].
+func (natsNoOpWriter) Write(context.Context, *eventqueue.Message) error { return nil }
+
+// Close implements [eventqueue.Writer.Close].
+func (natsNoOpWriter) Close() error { return nil }
+
+// natsEventWriter publishes events from the outbox relay to the EVENTS stream.
+type natsEventWriter struct {
+ js jetstream.JetStream
+}
+
+// NewEventWriter creates an [eventqueue.Writer] that publishes to the EVENTS stream.
+func NewEventWriter(js jetstream.JetStream) eventqueue.Writer {
+ return &natsEventWriter{js: js}
+}
+
+func (w *natsEventWriter) Write(ctx context.Context, msg *eventqueue.Message) error {
+ if err := PublishJSON(ctx, w.js, eventsSubject(msg.Event.TaskInfo().TaskID), msg); err != nil {
+ return fmt.Errorf("nats publish: %w", err)
+ }
+ return nil
+}
+
+func (w *natsEventWriter) Close() error { return nil }
+
+func eventsSubject(tid a2a.TaskID) string {
+ return "events." + string(tid)
+}
diff --git a/samples/go/agents/deepresearch/internal/msgstream/pushsender.go b/samples/go/agents/deepresearch/internal/msgstream/pushsender.go
new file mode 100644
index 000000000..328afadb8
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/msgstream/pushsender.go
@@ -0,0 +1,69 @@
+package msgstream
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/a2asrv/push"
+ "github.com/a2aproject/a2a-go/v2/log"
+ "github.com/nats-io/nats.go"
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+const natsURLScheme = "nats://"
+
+// PushSenderConfig configures a NATS-backed [push.Sender].
+type PushSenderConfig struct {
+ Jetstream jetstream.JetStream
+}
+
+// NewPushSender creates a [push.Sender] that publishes status updates to NATS subjects.
+func NewPushSender(cfg PushSenderConfig) push.Sender {
+ return &natsPushSender{cfg}
+}
+
+// NewPushConfig creates a [a2a.PushConfig] that routes push notifications to the given NATS subject.
+func NewPushConfig(subject string, token string) *a2a.PushConfig {
+ return &a2a.PushConfig{URL: natsURLScheme + subject, Token: token}
+}
+
+// natsPushSender implements push.Sender by publishing status update events
+// to a NATS JetStream subject. Only handles PushConfig URLs with the [natsURLScheme].
+type natsPushSender struct {
+ PushSenderConfig
+}
+
+var _ push.Sender = (*natsPushSender)(nil)
+
+// SendPush implements [push.Sender.SendPush].
+func (s *natsPushSender) SendPush(ctx context.Context, config *a2a.PushConfig, event a2a.Event) error {
+ su, ok := event.(*a2a.TaskStatusUpdateEvent)
+ if !ok {
+ return nil
+ }
+
+ subject, ok := strings.CutPrefix(config.URL, natsURLScheme)
+ if !ok {
+ return nil
+ }
+
+ data, err := json.Marshal(a2a.StreamResponse{Event: event})
+ if err != nil {
+ return fmt.Errorf("marshal push event: %w", err)
+ }
+
+ if _, err := s.Jetstream.PublishMsg(ctx, &nats.Msg{
+ Subject: subject,
+ Header: nats.Header{"A2A-Token": []string{config.Token}},
+ Data: data,
+ }); err != nil {
+ return fmt.Errorf("nats publish push to %s: %w", subject, err)
+ }
+
+ log.Debug(ctx, "push notification sent", "subject", subject, "status", su.Status.State, "task_id", su.TaskID)
+
+ return nil
+}
diff --git a/samples/go/agents/deepresearch/internal/msgstream/util.go b/samples/go/agents/deepresearch/internal/msgstream/util.go
new file mode 100644
index 000000000..61877c7e4
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/msgstream/util.go
@@ -0,0 +1,20 @@
+package msgstream
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+// PublishJSON serializes value to JSON and publishes it on the provided subject.
+func PublishJSON(ctx context.Context, js jetstream.JetStream, subject string, value any) error {
+ msgJSON, err := json.Marshal(value)
+ if err != nil {
+ return err
+ }
+ if _, err := js.Publish(ctx, subject, msgJSON); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/samples/go/agents/deepresearch/internal/msgstream/workqueue.go b/samples/go/agents/deepresearch/internal/msgstream/workqueue.go
new file mode 100644
index 000000000..509bdcb98
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/msgstream/workqueue.go
@@ -0,0 +1,108 @@
+package msgstream
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "time"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/a2asrv/workqueue"
+ "github.com/a2aproject/a2a-go/v2/log"
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/domain"
+ "github.com/nats-io/nats.go/jetstream"
+)
+
+const (
+ workStream = "WORK"
+ workSubject = "work"
+)
+
+type natsWorkReadWriter struct {
+ jetsteam jetstream.JetStream
+ consumer jetstream.Consumer
+ subject string
+}
+
+var _ workqueue.ReadWriter = (*natsWorkReadWriter)(nil)
+
+// CreateWorkQueue creates a NATS-backed pull work queue for the given agent type.
+func CreateWorkQueue(ctx context.Context, js jetstream.JetStream, agentType domain.AgentType) (workqueue.Queue, error) {
+ ws, err := js.Stream(ctx, workStream)
+ if err != nil {
+ return nil, fmt.Errorf("nats %q stream: %w", workStream, err)
+ }
+ cons, err := ws.Consumer(ctx, string(agentType))
+ if err != nil {
+ return nil, fmt.Errorf("nats %q stream %q consumer: %w", workStream, agentType, err)
+ }
+ subject := workSubject + "." + string(agentType)
+ readWriter := &natsWorkReadWriter{jetsteam: js, consumer: cons, subject: subject}
+ return workqueue.NewPullQueue(readWriter, nil), nil
+}
+
+func (rw *natsWorkReadWriter) Write(ctx context.Context, p *workqueue.Payload) (a2a.TaskID, error) {
+ if err := PublishJSON(ctx, rw.jetsteam, rw.subject, p); err != nil {
+ return "", fmt.Errorf("js publish: %w", err)
+ }
+ log.Info(ctx, "work item published", "task_id", p.TaskID, "type", p.Type, "subject", rw.subject)
+ return p.TaskID, nil
+}
+
+// Read blocks until a work item is available. It polls NATS Fetch in a loop
+// because Fetch returns an empty batch (no error) on timeout.
+func (rw *natsWorkReadWriter) Read(ctx context.Context) (workqueue.Message, error) {
+ for ctx.Err() == nil {
+ batch, err := rw.consumer.Fetch(1, jetstream.FetchMaxWait(1*time.Minute))
+ if err != nil {
+ return nil, fmt.Errorf("js fetch: %w", err)
+ }
+
+ for msg := range batch.Messages() {
+ var p workqueue.Payload
+ if err := json.Unmarshal(msg.Data(), &p); err != nil {
+ msg.Nak() //nolint:errcheck // best-effort nak before returning the unmarshal error
+ return nil, fmt.Errorf("unmarshal payload: %w", err)
+ }
+
+ log.Info(ctx, "work item dequeued", "task_id", p.TaskID, "type", p.Type)
+
+ return &natsWorkMsg{payload: &p, msg: msg}, nil
+ }
+
+ if err := batch.Error(); err != nil {
+ return nil, err
+ }
+ // Empty batch, retry.
+ }
+ return nil, ctx.Err()
+}
+
+type natsWorkMsg struct {
+ payload *workqueue.Payload
+ msg jetstream.Msg
+}
+
+var (
+ _ workqueue.Message = (*natsWorkMsg)(nil)
+ _ workqueue.Heartbeater = (*natsWorkMsg)(nil)
+)
+
+// Payload implements [workqueue.Message.Payload].
+func (m *natsWorkMsg) Payload() *workqueue.Payload { return m.payload }
+
+// Complete implements [workqueue.Message.Complete].
+func (m *natsWorkMsg) Complete(_ context.Context) error { return m.msg.Ack() }
+
+// Return implements [workqueue.Message.Return].
+func (m *natsWorkMsg) Return(ctx context.Context, cause error) error {
+ log.Warn(ctx, "work item returned (nak)", "task_id", m.payload.TaskID, "cause", cause)
+ return m.msg.Nak()
+}
+
+// HeartbeatInterval returns the interval at which InProgress signals are sent
+// to NATS, preventing the ack timeout from expiring during long-running tasks.
+func (m *natsWorkMsg) HeartbeatInterval() time.Duration { return time.Second }
+
+// Heartbeat signals NATS that this message is still being processed.
+func (m *natsWorkMsg) Heartbeat(_ context.Context) error { return m.msg.InProgress() }
diff --git a/samples/go/agents/deepresearch/internal/report/page.go b/samples/go/agents/deepresearch/internal/report/page.go
new file mode 100644
index 000000000..21e970e48
--- /dev/null
+++ b/samples/go/agents/deepresearch/internal/report/page.go
@@ -0,0 +1,283 @@
+// Package report serves synthesizer task artifacts as styled HTML pages.
+package report
+
+import (
+ "encoding/json"
+ "errors"
+ "html/template"
+ "net/http"
+ "strings"
+
+ "github.com/a2aproject/a2a-go/v2/a2a"
+ "github.com/a2aproject/a2a-go/v2/log"
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/domain"
+ "github.com/a2aproject/a2a-samples/samples/go/agents/deepresearch/internal/store"
+)
+
+// reportData holds the template data for rendering a report page.
+type reportData struct {
+ TaskID string
+ State string
+ BadgeClass string
+ Content template.JS // JSON-encoded markdown string, safe for JS embedding.
+ HasContent bool
+ IsWorking bool
+}
+
+// NewServer returns an HTTP handler that serves synthesizer reports as HTML pages.
+// It expects the request path to contain an {id} segment matching a synthesizer task ID.
+func NewServer(s *store.Store) http.Handler {
+ tmpl := template.Must(template.New("report").Parse(pageTemplate))
+
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ id := r.PathValue("id")
+
+ taskMeta, err := s.GetIndexed(ctx, a2a.TaskID(id))
+ if errors.Is(err, a2a.ErrTaskNotFound) {
+ log.Info(ctx, "report source does not exist")
+ http.Error(w, "Report not found", http.StatusNotFound)
+ return
+ }
+ if err != nil {
+ log.Warn(ctx, "report source query failed", "cause", err)
+ http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
+ return
+ }
+ if taskMeta.Agent != domain.AgentSynthesizer {
+ log.Warn(ctx, "task is not a synthesizer report", "author", taskMeta.Agent)
+ http.Error(w, "Report not found", http.StatusNotFound)
+ return
+ }
+
+ task, err := s.Get(ctx, a2a.TaskID(id))
+ if err != nil {
+ log.Warn(ctx, "report source read failed", "cause", err)
+ http.Error(w, "Service unavailable", http.StatusServiceUnavailable)
+ return
+ }
+
+ markdown := extractMarkdown(task.Task)
+ contentJSON, err := json.Marshal(markdown)
+ if err != nil {
+ log.Warn(ctx, "json marshal failed", "cause", err)
+ http.Error(w, "Internal error", http.StatusInternalServerError)
+ return
+ }
+ state := task.Task.Status.State
+ data := reportData{
+ TaskID: id,
+ State: stateLabel(state),
+ BadgeClass: badgeClass(state),
+ Content: template.JS(contentJSON), //nolint:gosec // contentJSON is JSON-encoded markdown, safe for JS embedding
+ HasContent: markdown != "",
+ IsWorking: !state.Terminal(),
+ }
+
+ w.Header().Set("Content-Type", "text/html; charset=utf-8")
+ if data.IsWorking {
+ w.Header().Set("Refresh", "5")
+ }
+ if err := tmpl.Execute(w, data); err != nil {
+ log.Warn(ctx, "template execution failed", "cause", err)
+ }
+ })
+}
+
+// extractMarkdown collects all text parts from the task's artifacts.
+func extractMarkdown(task *a2a.Task) string {
+ var parts []string
+ for _, artifact := range task.Artifacts {
+ for _, part := range artifact.Parts {
+ if text := part.Text(); text != "" {
+ parts = append(parts, text)
+ }
+ }
+ }
+ return strings.Join(parts, "\n\n")
+}
+
+func stateLabel(s a2a.TaskState) string {
+ switch s {
+ case a2a.TaskStateCompleted:
+ return "Completed"
+ case a2a.TaskStateWorking:
+ return "In Progress"
+ case a2a.TaskStateFailed:
+ return "Failed"
+ case a2a.TaskStateCanceled:
+ return "Canceled"
+ case a2a.TaskStateSubmitted:
+ return "Submitted"
+ default:
+ return string(s)
+ }
+}
+
+func badgeClass(s a2a.TaskState) string {
+ switch s {
+ case a2a.TaskStateCompleted:
+ return "badge-completed"
+ case a2a.TaskStateWorking, a2a.TaskStateSubmitted:
+ return "badge-working"
+ case a2a.TaskStateFailed:
+ return "badge-failed"
+ default:
+ return "badge-default"
+ }
+}
+
+const pageTemplate = `
+
+
This report is being generated. The page will refresh automatically.
+No content available for this report.