From b773efdb6e9afcbbe934e4f2e8d161e56aba0e6b Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 8 Jun 2026 17:44:06 +0200 Subject: [PATCH 1/2] chore: Update AGENTS.md and architecture docs --- .claude/architecture/EVENTQUEUE.md | 9 +++--- .claude/architecture/compatibility_0.3.md | 20 +++++------- .claude/architecture/eventqueue/FLOWS.md | 37 +++++++++-------------- AGENTS.md | 19 ++++++++++-- 4 files changed, 43 insertions(+), 42 deletions(-) diff --git a/.claude/architecture/EVENTQUEUE.md b/.claude/architecture/EVENTQUEUE.md index a72ece048..660141f31 100644 --- a/.claude/architecture/EVENTQUEUE.md +++ b/.claude/architecture/EVENTQUEUE.md @@ -73,12 +73,13 @@ Single background thread "MainEventBusProcessor" that processes events in order: ### EventConsumer & ResultAggregator **Locations**: `server-common/.../events/EventConsumer.java`, `server-common/.../tasks/ResultAggregator.java` -**EventConsumer**: Polls queue, returns `Flow.Publisher`, closes queue on final event +**EventConsumer**: Polls queue, returns `Flow.Publisher`, closes queue on final event +- `consumeOne()` - Returns single `Event`, blocking +- `consumeAll()` - Returns `Flow.Publisher` for reactive consumption **ResultAggregator** bridges EventConsumer and DefaultRequestHandler: -- `consumeAndBreakOnInterrupt()` - Non-streaming (polls until terminal/AUTH_REQUIRED) -- `consumeAndEmit()` - Streaming (returns Flow.Publisher immediately) -- `consumeAll()` - Simple consumption +- `consumeAndBreakOnInterrupt(consumer, blocking)` - Non-streaming (polls until terminal/AUTH_REQUIRED) +- `consumeAndEmit(consumer)` - Streaming (returns Flow.Publisher immediately) --- diff --git a/.claude/architecture/compatibility_0.3.md b/.claude/architecture/compatibility_0.3.md index 8347a2769..f9dfcccdf 100644 --- a/.claude/architecture/compatibility_0.3.md +++ b/.claude/architecture/compatibility_0.3.md @@ -206,11 +206,12 @@ compat-0.3/ │ └── [generated classes] # No _v0_3 suffix (generated code) ├── http-client/ # HTTP client abstraction for v0.3 │ └── pom.xml -├── server-conversion/ # ⭐ Core conversion layer (NEW) +├── server-conversion/ # ⭐ Core conversion layer │ ├── pom.xml │ └── src/main/java/org/a2aproject/sdk/compat03/conversion/ │ ├── Convert_v0_3_To10RequestHandler.java # Main adapter │ ├── ErrorConverter_v0_3.java # Error conversion +│ ├── PushNotificationPayloadFormatter_v0_3.java # v0.3 push notification format │ └── mappers/ │ ├── config/ │ │ ├── A03ToV10MapperConfig.java # MapStruct config @@ -228,6 +229,8 @@ compat-0.3/ │ │ └── ... │ └── result/ # Response result mappers │ └── ListTaskPushNotificationConfigsResultMapper_v0_3.java +├── tests/ # Test infrastructure +│ └── server-common/ # Shared test base classes (AgentExecutorProducer_v0_3) ├── client/ # v0.3-compatible client │ ├── base/ # Client_v0_3 — dedicated 0.3 API │ │ └── pom.xml @@ -295,7 +298,7 @@ All compat-0.3 code uses the `org.a2aproject.sdk.compat03` package root: - `org.a2aproject.sdk.compat03.server.{apps,grpc,rest}.quarkus` — reference servers - `org.a2aproject.sdk.compat03.tck` — conformance tests -**Note**: During this implementation, the main codebase was migrated from `io.github.a2asdk` (groupId) and `io.a2a` (package) to `org.a2aproject.sdk` (both groupId and package) via PRs #750 and #786. +**Note**: The main codebase uses `org.a2aproject.sdk` for both groupId and package root. --- @@ -416,10 +419,7 @@ The `server-conversion` module produces a test-jar containing shared test infras - Error mapping tests - Task state conversion tests - Reference server integration tests - -🔲 **Deferred:** -- Push notification tests (depends on TestHttpClient porting) -- Test metadata classes (classpath scanning) +- Push notification payload formatter tests --- @@ -548,10 +548,4 @@ For JSON-RPC and REST, multi-version convenience modules are also available that ## Status -The v0.3 compatibility layer is fully implemented: spec types, gRPC generation, conversion layer, all three transport handlers (JSON-RPC, gRPC, REST), client API and transports, reference servers, multi-version deployment, version-aware push notifications, test infrastructure, 125+ integration tests, and TCK module are all in place. - -🔲 **Outstanding:** -- Push notification test porting (requires TestHttpClient) -- Test metadata classes (classpath scanning) -- Replace FQNs with imports (97 occurrences in 34 files) -- Unify AgentCard producers across reference modules +The v0.3 compatibility layer is fully implemented: spec types, gRPC generation, conversion layer, all three transport handlers (JSON-RPC, gRPC, REST), client API and transports, reference servers, multi-version deployment, version-aware push notifications, test infrastructure, integration tests, and TCK module are all in place. diff --git a/.claude/architecture/eventqueue/FLOWS.md b/.claude/architecture/eventqueue/FLOWS.md index d2906a1f5..f19af8d6f 100644 --- a/.claude/architecture/eventqueue/FLOWS.md +++ b/.claude/architecture/eventqueue/FLOWS.md @@ -91,15 +91,17 @@ Events that cause polling loop exit: **Purpose**: Consumes events from EventQueue and exposes as reactive stream **Key Methods**: -- `consume()` → Returns `Flow.Publisher` +- `consumeOne()` → Returns single `Event` (blocking) +- `consumeAll()` → Returns `Flow.Publisher` (reactive) - Polls queue with 500ms timeout - Closes queue on final event - Thread-safe concurrent consumption +- Handles thread offloading internally **Usage**: ```java -EventConsumer consumer = new EventConsumer(eventQueue); -Flow.Publisher publisher = consumer.consume(); +EventConsumer consumer = new EventConsumer(eventQueue, executor); +Flow.Publisher publisher = consumer.consumeAll(); // Subscribe to receive events as they arrive ``` @@ -111,42 +113,32 @@ Flow.Publisher publisher = consumer.consume(); Bridges EventConsumer and DefaultRequestHandler with three consumption modes: -### 1. consumeAndBreakOnInterrupt() +### 1. consumeAndBreakOnInterrupt(consumer, blocking) **Used by**: `onMessageSend()` (non-streaming) **Behavior**: +- Subscribes to `EventConsumer.consumeAll()` on `@EventConsumerExecutor` thread pool - Polls queue until terminal event or AUTH_REQUIRED -- Returns `EventTypeAndInterrupt(event, interrupted)` -- Blocking operation +- Returns `EventTypeAndInterrupt(eventType, interrupted, consumptionFuture)` +- `consumptionFuture` tracks background consumption completion for cleanup coordination - Exits early on AUTH_REQUIRED (interrupted = true) +- For blocking calls: interrupts to free Vert.x thread, continues background consumption **Use Case**: Non-streaming requests that need single final response -### 2. consumeAndEmit() +### 2. consumeAndEmit(consumer) **Used by**: `onMessageSendStream()` (streaming) **Behavior**: -- Returns all events as `Flow.Publisher` +- Returns all events as `Flow.Publisher` - Non-blocking, immediate return - Client subscribes to stream - Events delivered as they arrive **Use Case**: Streaming requests where client wants all events in real-time -### 3. consumeAll() - -**Used by**: `onCancelTask()` - -**Behavior**: -- Consumes all events from queue -- Returns first `Message` or final `Task` found -- Simple consumption without streaming -- Blocks until queue exhausted - -**Use Case**: Task cancellation where final state matters - --- ## Flow Comparison Table @@ -154,7 +146,7 @@ Bridges EventConsumer and DefaultRequestHandler with three consumption modes: | Aspect | Non-Streaming | Streaming | |--------|---------------|-----------| | **ResultAggregator Mode** | consumeAndBreakOnInterrupt | consumeAndEmit | -| **Return Type** | Task/Message | Flow.Publisher | +| **Return Type** | Task/Message | Flow.Publisher\ | | **Blocking** | Yes (until terminal event) | No (immediate return) | | **Cleanup** | Immediate or async | Always async | | **AUTH_REQUIRED** | Early exit, return task | Continue streaming | @@ -211,7 +203,8 @@ cleanup(queue, task, true); // ALWAYS async for streaming - Persists to TaskStore, distributes to ChildQueues ### Consumer Thread -- Non-streaming: Request handler thread (blocking) +- `@EventConsumerExecutor` cached thread pool (avoids deadlock under high concurrency) +- Non-streaming: Background thread via `CompletableFuture.runAsync()` on `@EventConsumerExecutor` - Streaming: Subscriber thread (reactive) - Polls ChildQueue for events diff --git a/AGENTS.md b/AGENTS.md index 9e325fb3f..a74ff8770 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -16,7 +16,7 @@ mvn clean install ## Project Structure -- `spec/` — A2A specification types (Java POJOs for the protocol) +- `spec/` — A2A specification types (Java records for the protocol) - `spec-grpc/` — gRPC protobuf definitions and generated classes - `common/` — Shared utilities used across modules - `client/` — Client SDK @@ -29,10 +29,22 @@ mvn clean install - `jsonrpc-common/` — Shared JSON-RPC utilities - `reference/` — Reference server implementations built on Quarkus - `common/`, `jsonrpc/`, `grpc/`, `rest/` + - `multiversion-jsonrpc/`, `multiversion-rest/` — Multi-version dispatching (v1.0 + v0.3) - `tck/` — Technology Compatibility Kit (protocol conformance tests) -- `tests/` — Integration tests -- `extras/` — Optional add-ons (OpenTelemetry, JPA task/notification stores, replicated queue manager, Vert.x HTTP client) +- `tests/` — Integration and server-common tests + - `server-common/` — Server-common integration tests + - `multiversion/` — Multi-version integration tests (jsonrpc, rest, grpc) +- `extras/` — Optional add-ons + - `common/` — Shared classes across extras modules + - `opentelemetry/` — OpenTelemetry integration + - `task-store-database-jpa/` — JPA-backed task store + - `push-notification-config-store-database-jpa/` — JPA-backed push notification config store + - `queue-manager-replicated/` — Kafka-based replicated queue manager + - `http-client-vertx/` — Vert.x HTTP client + - `http-client-android/` — Android HTTP client +- `compat-0.3/` — Backward compatibility layer for A2A protocol v0.3 - `integrations/` — Runtime integrations (e.g., MicroProfile Config) +- `test-utils-docker/` — Test utilities for conditional Docker-based test execution - `boms/` — Bill of Materials POMs (sdk, extras, reference, test-utils) - `examples/` — Sample applications (helloworld, cloud-deployment) @@ -68,6 +80,7 @@ mvn clean install ### Skills - [update-a2a-proto](.agents/skills/update-a2a-proto/SKILL.md) — Update the gRPC proto file `a2a.proto` from upstream and regenerate Java sources +- [fix-tck-issue](.agents/skills/fix-tck-issue/SKILL.md) — Analyze and fix A2A TCK compatibility issues across transports ### Commands From 4f154780aa69d02c399f9447c6bc8beba499495c Mon Sep 17 00:00:00 2001 From: Kabir Khan Date: Mon, 8 Jun 2026 17:55:23 +0200 Subject: [PATCH 2/2] Review fixes --- .claude/architecture/EVENTQUEUE.md | 2 +- .claude/architecture/eventqueue/FLOWS.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.claude/architecture/EVENTQUEUE.md b/.claude/architecture/EVENTQUEUE.md index 660141f31..e0980ab95 100644 --- a/.claude/architecture/EVENTQUEUE.md +++ b/.claude/architecture/EVENTQUEUE.md @@ -74,7 +74,7 @@ Single background thread "MainEventBusProcessor" that processes events in order: **Locations**: `server-common/.../events/EventConsumer.java`, `server-common/.../tasks/ResultAggregator.java` **EventConsumer**: Polls queue, returns `Flow.Publisher`, closes queue on final event -- `consumeOne()` - Returns single `Event`, blocking +- `consumeOne()` - Returns single `Event`, non-blocking (throws if queue is empty) - `consumeAll()` - Returns `Flow.Publisher` for reactive consumption **ResultAggregator** bridges EventConsumer and DefaultRequestHandler: diff --git a/.claude/architecture/eventqueue/FLOWS.md b/.claude/architecture/eventqueue/FLOWS.md index f19af8d6f..286e0d5c6 100644 --- a/.claude/architecture/eventqueue/FLOWS.md +++ b/.claude/architecture/eventqueue/FLOWS.md @@ -91,7 +91,7 @@ Events that cause polling loop exit: **Purpose**: Consumes events from EventQueue and exposes as reactive stream **Key Methods**: -- `consumeOne()` → Returns single `Event` (blocking) +- `consumeOne()` → Returns single `Event` (non-blocking, throws if queue is empty) - `consumeAll()` → Returns `Flow.Publisher` (reactive) - Polls queue with 500ms timeout - Closes queue on final event