Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .claude/architecture/EVENTQUEUE.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,13 @@ Single background thread "MainEventBusProcessor" that processes events in order:
### EventConsumer & ResultAggregator
**Locations**: `server-common/.../events/EventConsumer.java`, `server-common/.../tasks/ResultAggregator.java`

**EventConsumer**: Polls queue, returns `Flow.Publisher<Event>`, closes queue on final event
**EventConsumer**: Polls queue, returns `Flow.Publisher<EventQueueItem>`, closes queue on final event
- `consumeOne()` - Returns single `Event`, non-blocking (throws if queue is empty)
- `consumeAll()` - Returns `Flow.Publisher<EventQueueItem>` for reactive consumption

**ResultAggregator** bridges EventConsumer and DefaultRequestHandler:
- `consumeAndBreakOnInterrupt()` - Non-streaming (polls until terminal/AUTH_REQUIRED)
- `consumeAndEmit()` - Streaming (returns Flow.Publisher immediately)
- `consumeAll()` - Simple consumption
- `consumeAndBreakOnInterrupt(consumer, blocking)` - Non-streaming (polls until terminal/AUTH_REQUIRED)
- `consumeAndEmit(consumer)` - Streaming (returns Flow.Publisher immediately)

---

Expand Down
20 changes: 7 additions & 13 deletions .claude/architecture/compatibility_0.3.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,12 @@ compat-0.3/
│ └── [generated classes] # No _v0_3 suffix (generated code)
├── http-client/ # HTTP client abstraction for v0.3
│ └── pom.xml
├── server-conversion/ # ⭐ Core conversion layer (NEW)
├── server-conversion/ # ⭐ Core conversion layer
│ ├── pom.xml
│ └── src/main/java/org/a2aproject/sdk/compat03/conversion/
│ ├── Convert_v0_3_To10RequestHandler.java # Main adapter
│ ├── ErrorConverter_v0_3.java # Error conversion
│ ├── PushNotificationPayloadFormatter_v0_3.java # v0.3 push notification format
│ └── mappers/
│ ├── config/
│ │ ├── A03ToV10MapperConfig.java # MapStruct config
Expand All @@ -228,6 +229,8 @@ compat-0.3/
│ │ └── ...
│ └── result/ # Response result mappers
│ └── ListTaskPushNotificationConfigsResultMapper_v0_3.java
├── tests/ # Test infrastructure
│ └── server-common/ # Shared test base classes (AgentExecutorProducer_v0_3)
├── client/ # v0.3-compatible client
│ ├── base/ # Client_v0_3 — dedicated 0.3 API
│ │ └── pom.xml
Expand Down Expand Up @@ -295,7 +298,7 @@ All compat-0.3 code uses the `org.a2aproject.sdk.compat03` package root:
- `org.a2aproject.sdk.compat03.server.{apps,grpc,rest}.quarkus` — reference servers
- `org.a2aproject.sdk.compat03.tck` — conformance tests

**Note**: During this implementation, the main codebase was migrated from `io.github.a2asdk` (groupId) and `io.a2a` (package) to `org.a2aproject.sdk` (both groupId and package) via PRs #750 and #786.
**Note**: The main codebase uses `org.a2aproject.sdk` for both groupId and package root.

---

Expand Down Expand Up @@ -416,10 +419,7 @@ The `server-conversion` module produces a test-jar containing shared test infras
- Error mapping tests
- Task state conversion tests
- Reference server integration tests

🔲 **Deferred:**
- Push notification tests (depends on TestHttpClient porting)
- Test metadata classes (classpath scanning)
- Push notification payload formatter tests

---

Expand Down Expand Up @@ -548,10 +548,4 @@ For JSON-RPC and REST, multi-version convenience modules are also available that

## Status

The v0.3 compatibility layer is fully implemented: spec types, gRPC generation, conversion layer, all three transport handlers (JSON-RPC, gRPC, REST), client API and transports, reference servers, multi-version deployment, version-aware push notifications, test infrastructure, 125+ integration tests, and TCK module are all in place.

🔲 **Outstanding:**
- Push notification test porting (requires TestHttpClient)
- Test metadata classes (classpath scanning)
- Replace FQNs with imports (97 occurrences in 34 files)
- Unify AgentCard producers across reference modules
The v0.3 compatibility layer is fully implemented: spec types, gRPC generation, conversion layer, all three transport handlers (JSON-RPC, gRPC, REST), client API and transports, reference servers, multi-version deployment, version-aware push notifications, test infrastructure, integration tests, and TCK module are all in place.
37 changes: 15 additions & 22 deletions .claude/architecture/eventqueue/FLOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,17 @@ Events that cause polling loop exit:
**Purpose**: Consumes events from EventQueue and exposes as reactive stream

**Key Methods**:
- `consume()` → Returns `Flow.Publisher<Event>`
- `consumeOne()` → Returns single `Event` (non-blocking, throws if queue is empty)
- `consumeAll()` → Returns `Flow.Publisher<EventQueueItem>` (reactive)
- Polls queue with 500ms timeout
- Closes queue on final event
- Thread-safe concurrent consumption
- Handles thread offloading internally

**Usage**:
```java
EventConsumer consumer = new EventConsumer(eventQueue);
Flow.Publisher<Event> publisher = consumer.consume();
EventConsumer consumer = new EventConsumer(eventQueue, executor);
Flow.Publisher<EventQueueItem> publisher = consumer.consumeAll();
// Subscribe to receive events as they arrive
```

Expand All @@ -111,50 +113,40 @@ Flow.Publisher<Event> publisher = consumer.consume();

Bridges EventConsumer and DefaultRequestHandler with three consumption modes:

### 1. consumeAndBreakOnInterrupt()
### 1. consumeAndBreakOnInterrupt(consumer, blocking)

**Used by**: `onMessageSend()` (non-streaming)

**Behavior**:
- Subscribes to `EventConsumer.consumeAll()` on `@EventConsumerExecutor` thread pool
- Polls queue until terminal event or AUTH_REQUIRED
- Returns `EventTypeAndInterrupt(event, interrupted)`
- Blocking operation
- Returns `EventTypeAndInterrupt(eventType, interrupted, consumptionFuture)`
- `consumptionFuture` tracks background consumption completion for cleanup coordination
- Exits early on AUTH_REQUIRED (interrupted = true)
- For blocking calls: interrupts to free Vert.x thread, continues background consumption

**Use Case**: Non-streaming requests that need single final response

### 2. consumeAndEmit()
### 2. consumeAndEmit(consumer)

**Used by**: `onMessageSendStream()` (streaming)

**Behavior**:
- Returns all events as `Flow.Publisher<Event>`
- Returns all events as `Flow.Publisher<EventQueueItem>`
- Non-blocking, immediate return
- Client subscribes to stream
- Events delivered as they arrive

**Use Case**: Streaming requests where client wants all events in real-time

### 3. consumeAll()

**Used by**: `onCancelTask()`

**Behavior**:
- Consumes all events from queue
- Returns first `Message` or final `Task` found
- Simple consumption without streaming
- Blocks until queue exhausted

**Use Case**: Task cancellation where final state matters

---

## Flow Comparison Table

| Aspect | Non-Streaming | Streaming |
|--------|---------------|-----------|
| **ResultAggregator Mode** | consumeAndBreakOnInterrupt | consumeAndEmit |
| **Return Type** | Task/Message | Flow.Publisher |
| **Return Type** | Task/Message | Flow.Publisher\<EventQueueItem\> |
| **Blocking** | Yes (until terminal event) | No (immediate return) |
| **Cleanup** | Immediate or async | Always async |
| **AUTH_REQUIRED** | Early exit, return task | Continue streaming |
Expand Down Expand Up @@ -211,7 +203,8 @@ cleanup(queue, task, true); // ALWAYS async for streaming
- Persists to TaskStore, distributes to ChildQueues

### Consumer Thread
- Non-streaming: Request handler thread (blocking)
- `@EventConsumerExecutor` cached thread pool (avoids deadlock under high concurrency)
- Non-streaming: Background thread via `CompletableFuture.runAsync()` on `@EventConsumerExecutor`
- Streaming: Subscriber thread (reactive)
- Polls ChildQueue for events

Expand Down
19 changes: 16 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mvn clean install

## Project Structure

- `spec/` — A2A specification types (Java POJOs for the protocol)
- `spec/` — A2A specification types (Java records for the protocol)
- `spec-grpc/` — gRPC protobuf definitions and generated classes
- `common/` — Shared utilities used across modules
- `client/` — Client SDK
Expand All @@ -29,10 +29,22 @@ mvn clean install
- `jsonrpc-common/` — Shared JSON-RPC utilities
- `reference/` — Reference server implementations built on Quarkus
- `common/`, `jsonrpc/`, `grpc/`, `rest/`
- `multiversion-jsonrpc/`, `multiversion-rest/` — Multi-version dispatching (v1.0 + v0.3)
- `tck/` — Technology Compatibility Kit (protocol conformance tests)
- `tests/` — Integration tests
- `extras/` — Optional add-ons (OpenTelemetry, JPA task/notification stores, replicated queue manager, Vert.x HTTP client)
- `tests/` — Integration and server-common tests
- `server-common/` — Server-common integration tests
- `multiversion/` — Multi-version integration tests (jsonrpc, rest, grpc)
- `extras/` — Optional add-ons
- `common/` — Shared classes across extras modules
- `opentelemetry/` — OpenTelemetry integration
- `task-store-database-jpa/` — JPA-backed task store
- `push-notification-config-store-database-jpa/` — JPA-backed push notification config store
- `queue-manager-replicated/` — Kafka-based replicated queue manager
- `http-client-vertx/` — Vert.x HTTP client
- `http-client-android/` — Android HTTP client
- `compat-0.3/` — Backward compatibility layer for A2A protocol v0.3
- `integrations/` — Runtime integrations (e.g., MicroProfile Config)
- `test-utils-docker/` — Test utilities for conditional Docker-based test execution
- `boms/` — Bill of Materials POMs (sdk, extras, reference, test-utils)
- `examples/` — Sample applications (helloworld, cloud-deployment)

Expand Down Expand Up @@ -68,6 +80,7 @@ mvn clean install
### Skills

- [update-a2a-proto](.agents/skills/update-a2a-proto/SKILL.md) — Update the gRPC proto file `a2a.proto` from upstream and regenerate Java sources
- [fix-tck-issue](.agents/skills/fix-tck-issue/SKILL.md) — Analyze and fix A2A TCK compatibility issues across transports

### Commands

Expand Down
Loading