diff --git a/rfcs/at-least-once-research.md b/rfcs/at-least-once-research.md new file mode 100644 index 0000000000..49c24ff0a0 --- /dev/null +++ b/rfcs/at-least-once-research.md @@ -0,0 +1,422 @@ +# At-Least-Once Delivery: Research & Prior Art + +**Date:** 2026-05-05 +**Author:** Research compilation +**Status:** Reference + +--- + +## 1. Delivery Guarantee Taxonomy + +| Guarantee | Meaning | Duplicate risk | Loss risk | +|---|---|---|---| +| At-most-once | Delivered zero or one time | No | Yes | +| At-least-once | Delivered one or more times | Yes | No | +| Exactly-once | Delivered exactly once | No | No | + +Exactly-once is the most expensive and, in distributed systems, always involves some form of distributed coordination or idempotent processing at the consumer. + +--- + +## 2. Apache Kafka + +### Core Model + +Kafka is a durable, ordered, replicated log. Messages are persisted to disk. Consumers track their progress via _offsets_ (an integer per partition). The broker does not delete messages on delivery — retention is time- or size-based. This decouples producers from consumers completely and enables replay. + +### At-Least-Once on the Producer Side + +**`acks`** +- `acks=0` — fire-and-forget, no durability. +- `acks=1` — leader acknowledges; replica lag = potential loss on leader failure. +- `acks=all` (default since Kafka 3.0) — all in-sync replicas must acknowledge before the producer call returns. Strongest durability guarantee. + +**`retries`** (default: `Integer.MAX_VALUE`) +Kafka retries transient errors automatically. Combined with `delivery.timeout.ms` (default: 2 minutes) this caps the total retry window. + +**`enable.idempotence`** (default: `true` since Kafka 3.0) +The broker assigns each producer a unique Producer ID (PID). Every record sent to a partition carries a monotonically increasing sequence number per (PID, partition). The broker rejects duplicate sequence numbers silently. This makes `send()` idempotent within a single producer session. + +> **Critical limit**: idempotence is per-session. A producer restart gets a new PID. Duplicate messages can occur across restarts. + +Requires: `acks=all`, `retries > 0`, `max.in.flight.requests.per.connection <= 5`. + +### At-Least-Once on the Consumer Side + +**`enable.auto.commit=false` (manual commit mode)** +Process messages first, then commit the offset. A crash between process and commit causes the next consumer to re-process — at-least-once. + +- `commitSync()` — blocks, retries indefinitely, use before shutdown or in `onPartitionsRevoked()`. +- `commitAsync()` — non-blocking, higher throughput, no retry (to avoid ordering bugs). Use in the main poll loop. +- **Production pattern**: `commitAsync()` in the loop + `commitSync()` in `onPartitionsRevoked()` + shutdown hook. + +**`auto.offset.reset`** (`earliest` | `latest`): where to start when no offset exists for a new consumer group. + +**`consumer.seek(partition, offset)`**: manually position the consumer. Enables replay and recovery. + +### Rebalancing Edge Cases + +**Eager protocol** (classic): all consumers revoke all partitions, group restarts. Async commits in flight at this moment may fail, causing the new owner to re-process from the last committed offset. + +**Cooperative (incremental) protocol** (default since Kafka 3.1+): only partitions that _need to move_ are revoked. Unaffected partitions continue processing without interruption. Dramatically reduces duplication risk. + +**Key rule**: always call `commitSync()` in `onPartitionsRevoked()` before handing back the partition. + +### Exactly-Once Semantics (Kafka Transactions) + +EOS = idempotent producer + transactions. + +**Flow**: +1. `initTransactions()` — registers `transactional.id`, receives PID + epoch. +2. `beginTransaction()` +3. Write records to N partitions (tagged with PID + epoch + seq). +4. `sendOffsetsToTransaction(offsets, groupId)` — atomically includes consumer offset commits in the transaction. +5. `commitTransaction()` — coordinator writes commit markers to all involved partitions. + +**Epoch fencing**: on producer restart with the same `transactional.id`, the coordinator bumps the epoch and aborts any pending transactions from the old instance. Old instances are rejected on any write attempt. + +**Consumer side**: set `isolation.level=read_committed`. Only records from committed transactions are visible. The Last Stable Offset (LSO) caps reads at the lowest open transaction. + +**Kafka Streams EOS**: `processing.guarantee=exactly_once_v2`. Wraps the consume-process-produce pipeline in a Kafka transaction per `commit.interval.ms`. + +> **Critical limit**: Kafka transactions are Kafka-internal only. There is no 2PC with external systems (databases, HTTP endpoints). For side effects outside Kafka, design idempotent consumers. + +--- + +## 3. NATS JetStream + +### Core NATS vs JetStream + +**Core NATS** is at-most-once. Messages are delivered only to currently connected subscribers. No persistence. + +**JetStream** adds a persistence layer: messages are written to _Streams_ (durable logs) and consumed by _Consumers_ (views into a stream with tracked delivery state). Enables temporal decoupling and replay. + +### Streams + +A stream captures messages on matching subjects. Retention policies: limits-based, work-queue, or interest-based. Storage: memory, file, or RAFT-replicated (replication factors 1, 2, 3, or 5). + +### Consumer Types + +**Push consumers**: server actively delivers messages to a specified delivery subject. Support queue groups for load balancing. Suitable for single-instance or simple setups. + +**Pull consumers** (recommended): client explicitly fetches batches using `Fetch(batchSize, timeout)` or `FetchNoWait(batchSize)`. Client controls processing rate. Preferred for scalable, reliable processing. + +### Ack Policies + +| Policy | Behavior | Use case | +|---|---|---| +| `AckExplicit` | Every message must be individually acked | At-least-once (recommended) | +| `AckAll` | Acking message N implicitly acks 1..N-1 | Reduce ack volume | +| `AckNone` | No ack required; server assumes delivery | At-most-once / fire-and-forget | + +`AckExplicit` is the only policy supported for pull consumers. + +### Ack Types + +- `Ack()` — confirms successful processing. +- `AckSync()` — server acknowledges the ack receipt (double-ack). Closes the "ack lost in flight" race. Required for exactly-once. +- `InProgress()` — extends the `AckWait` timer. Use for long-running processing. +- `Nak()` — negative ack; requests immediate or delayed redelivery. `nakDelay()` supports backoff. +- `Term()` — permanently terminates delivery of this message. No future redelivery. + +### Key Consumer Configuration + +**`AckWait`** (default: 30s): how long the server waits for an ack before scheduling redelivery. + +**`MaxDeliver`** (default: unlimited): maximum delivery attempts. On exhaustion, a JetStream Advisory is emitted (`$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.{stream}.{consumer}`). The message is _not_ automatically deleted — it must be manually acked or deleted. This is the JetStream dead-letter mechanism. + +**`BackOff`**: a sequence of durations for redelivery delays (e.g., `[1s, 5s, 30s, 5m]`). Overrides `AckWait`. Length must be ≤ `MaxDeliver`. + +**`MaxAckPending`**: maximum outstanding unacknowledged messages. Acts as application-level flow control. + +### Sequence Numbers + +Each message carries two sequence numbers: +- **Stream sequence**: immutable position in the stream. Does not change across redeliveries. +- **Consumer sequence**: counts delivery attempts to a specific consumer. Incremented on every delivery including redeliveries. + +If `ConsumerSeq=4, StreamSeq=3`, message 3 has been delivered 4 times. + +### Message Headers (JetStream metadata) + +| Header | Meaning | +|---|---| +| `Nats-Sequence` | stream sequence number | +| `Nats-Time-Stamp` | publication timestamp | +| `Nats-Subject` | original subject | +| `Nats-Stream` | stream name | +| `Nats-Consumer` | consumer name | +| `Nats-Num-Delivered` | delivery count (redelivery counter) | +| `Nats-Num-Pending` | remaining pending messages for consumer | +| `Nats-Msg-Id` | publisher-assigned deduplication ID | + +### Exactly-Once Semantics in JetStream + +Two mechanisms combined: + +**1. Publisher-side deduplication** (`Nats-Msg-Id`): publishers include a unique message ID. JetStream tracks seen IDs within a configurable window (default: 2 minutes). Duplicate IDs are silently discarded. + +**Per-subject infinite deduplication**: using `DiscardNewPerSubject` + `MaxMsgsPerSubject=1`, a subject can only hold one message. Publishing to an occupied subject fails immediately. Using the message ID as part of the subject name (e.g., `orders.{uuid}`) provides infinite, non-windowed deduplication — analogous to a database unique-key constraint. + +**2. Consumer-side double-ack** (`AckSync()`): the server acknowledges the consumer's ack. Once confirmed, the message will never be redelivered. Closes the "ack lost in network" race condition. + +**Full EOS recipe**: `js.Publish()` with `Nats-Msg-Id` + pull consumer with `AckExplicit` + `AckSync()`. + +### Durable vs Ephemeral Consumers + +- **Durable**: named, server-persisted, survives restarts. Multiple instances can share one durable pull consumer (competing consumers pattern). +- **Ephemeral**: unnamed, deleted when no client is connected. For one-shot replays or testing. + +--- + +## 4. At-Least-Once Delivery over WebSockets + +### The Fundamental Problem + +WebSockets (RFC 6455) run over TCP. TCP guarantees ordered, reliable delivery _within_ an active connection. It does not survive connection re-establishment. A new WebSocket connection is a fresh TCP connection with no shared state. Messages in flight at the moment of closure are lost. + +### Pattern A: Sequence IDs + Connection Recovery (Azure Web PubSub) + +Azure's reliable subprotocols implement a complete at-least-once protocol at the WebSocket level. + +**Connection recovery**: on connect, the server sends `connectionId` + `reconnectionToken`. On reconnect, the client includes these in the URL. If recovery succeeds within 1 minute, the session resumes. + +**Publisher side (`ackId`)**: each outgoing message carries a unique `ackId`. The server responds with `{"type":"ack","ackId":1,"success":true}`. On reconnect, the publisher resends with the same `ackId`. If the server already processed it, it responds with `"success":false,"error":{"name":"Duplicate"}`. Publisher stops resending. + +**Subscriber side (`sequenceId`)**: data messages carry a monotonically increasing `sequenceId` per session. The subscriber sends `{"type":"sequenceAck","sequenceId":5}`. On reconnect, the service replays all messages with `sequenceId > 5`. + +### Pattern B: MQTT over WebSockets (QoS 1 / QoS 2) + +MQTT runs as a WebSocket subprotocol (`mqtt`). All QoS levels are preserved over WebSockets. + +**QoS 1 (at-least-once)**: +1. Publisher sends `PUBLISH` with Packet ID, stores copy. +2. Broker sends `PUBACK` with same Packet ID. +3. Publisher deletes copy. +If no `PUBACK`, publisher retransmits with `DUP=1`. Broker always sends `PUBACK`. Duplicate delivery to subscribers is inherent — subscribers must be idempotent. + +Session persistence (`CleanStart=0`): broker queues QoS 1 messages for disconnected clients. Unacknowledged messages retransmitted on reconnect. + +**QoS 2 (exactly-once)**: four-step handshake — PUBLISH → PUBREC → PUBREL → PUBCOMP. The receiver's stored Packet ID prevents double-processing on duplicate PUBLISH. + +### Pattern C: STOMP over WebSockets + +STOMP is a text-based messaging protocol with native WebSocket support. + +**Ack modes** (set on `SUBSCRIBE` frame): +- `ack:auto` — server assumes delivery on send. +- `ack:client` — cumulative; acking message N acks 1..N-1. +- `ack:client-individual` — non-cumulative; each message requires its own `ACK` or `NACK`. + +**`NACK`**: signals non-consumption; broker redelivers, discards, or sends to DLQ (broker-dependent). + +**`RECEIPT` frame**: add `receipt:id` header to any frame; server responds with `RECEIPT` confirming processing. + +**Transactions**: `BEGIN` / `COMMIT` / `ABORT`. `SEND` and `ACK` inside a transaction are applied atomically. Uncommitted transactions are implicitly aborted on disconnect. + +### Pattern D: Socket.IO Connection State Recovery + +Socket.IO v4.6+ optionally buffers missed packets server-side for up to 2 minutes. On reconnect with a session ID, missed events are replayed. Default is still at-most-once — state recovery is opt-in and bounded. + +**Manual at-least-once**: assign IDs to events, store in a database, client sends acks, track last received ID per client, replay on reconnect. + +### Pattern E: Ably (Proprietary SDK) + +Assigns every message a unique serial number (timestamp-based). Provides exactly-once delivery: +- Client provides its last received serial on reconnect. +- Ably replays missed messages and deduplicates. +- History persisted for 24–72 hours across three regions. +- Ping/pong every 15 seconds; reconnect window of 2 minutes. + +### Summary: WebSocket Reliability + +| Library / Protocol | Transport | At-Least-Once | Mechanism | +|---|---|---|---| +| Azure Web PubSub (reliable) | WebSocket | Yes | sequenceId + ackId + connectionId recovery | +| MQTT QoS 1 | WebSocket | Yes | PUBLISH/PUBACK + session persistence | +| MQTT QoS 2 | WebSocket | Exactly-once | 4-way handshake + session state | +| STOMP (client-individual) | WebSocket | Yes (broker-dep.) | ACK/NACK frames + broker redelivery | +| Socket.IO v4.6+ (state recovery) | WebSocket | Partial (2min) | Buffered events + session ID | +| Ably SDK | WebSocket (proprietary) | Exactly-once | Serial numbers + 72h history | +| `reconnecting-websocket` | WebSocket | No | Reconnect only, no replay | + +--- + +## 5. At-Least-Once Delivery in GraphQL Subscriptions + +### Current State of the Spec + +GraphQL subscriptions are defined in the spec as an operation type, but the spec does not define a transport protocol. The `graphql-over-http` working draft explicitly states: _"GraphQL Subscriptions are beyond the scope of this specification at this time."_ + +There is **no standardized delivery guarantee** in the GraphQL spec. graphql-spec issue #419 ("Acknowledgement of messages in Subscription") was raised requesting ack/nack and replay capability. It was closed without being incorporated. + +Reliability is entirely the responsibility of the transport layer and application implementation. + +### Protocol: `graphql-ws` (`graphql-transport-ws`) + +The de facto standard for GraphQL subscriptions over WebSockets. + +**Messages**: +- `ConnectionInit` (C→S): initiate connection, optional payload. +- `ConnectionAck` (S→C): accepted. +- `Subscribe` (C→S): `{id, payload: {query, variables, operationName, extensions}}`. +- `Next` (S→C): `{id, payload: ExecutionResult}`. +- `Error` (S→C): execution errors, terminates operation. +- `Complete` (bidirectional): terminates operation. +- `Ping` / `Pong` (bidirectional): heartbeat. + +Multiplexing: multiple operations with different IDs on one connection. + +**Delivery guarantee**: none. If the WebSocket closes, in-flight `Next` messages are lost. On reconnect, the client re-subscribes from `ConnectionInit` — it receives only messages published _after_ the new subscription starts. + +The `ConnectionInit` payload is free-form JSON, which makes it the natural extension point for passing resume cursors. + +### Protocol: `graphql-sse` + +Uses Server-Sent Events rather than WebSockets. + +- **Distinct Connections Mode** (HTTP/2+): each operation is its own SSE stream. +- **Single Connection Mode** (HTTP/1 safe): one persistent SSE stream for all events; operations multiplexed with a token. + +The SSE `Last-Event-ID` mechanism is available at the HTTP level, but the reference implementation does not implement server-side replay based on it. Replay depends entirely on the server implementation. + +### Commercial Implementations + +**AWS AppSync**: WebSocket-based subscriptions. Does not document at-least-once delivery. Explicitly states messages may not arrive in order. + +**Hasura subscriptions**: live queries, not event streams. The server polls the database and sends the current result when it changes. Missed state transitions between polls are never delivered. GitHub issue #3517 ("Not all mutations delivered to subscribed client") acknowledges this. Hasura _Event Triggers_ (separate HTTP webhooks) do guarantee at-least-once, but that is not a GraphQL subscription. + +**Hasura feature request #2317**: proposal for Kafka-backed event subscriptions providing true at-least-once with replay. Not yet implemented as of 2025. + +### Application-Level Cursor-Based Resumption (Platformatic Pattern) + +The emerging pattern for reliable GraphQL subscriptions: + +1. **Schema**: the subscription resolver accepts an optional `afterCursor` argument: + ```graphql + type Subscription { + onMessage(afterCursor: String): Message + } + ``` +2. **Server**: events are stored in a persistent store with sequential IDs. On subscribe with `afterCursor`, replay all stored events after that cursor, then transition to live delivery. +3. **Client**: track the `id` (or `cursor`) field of each received message. On reconnect, re-subscribe with `afterCursor: `. +4. **Heartbeat**: 30-second pings to detect stale connections. + +This shifts subscriptions from connection-stateful to _stateless and resumable_. Achieves 100% delivery across connection failures in testing, but requires explicit server and client implementation. + +--- + +## 6. Server-Sent Events with Last-Event-ID + +SSE (`text/event-stream`) has built-in reconnection and replay support in the HTML spec. + +``` +id: 42 +event: order-update +data: {"orderId":"abc","status":"shipped"} +retry: 3000 +``` + +**`id` field**: sets the browser's `lastEventId`. Persisted across reconnects. + +**`Last-Event-ID` header**: on reconnect, the browser sends this header. The server replays all events with ID > lastEventId. + +**At-least-once via SSE**: +1. Server assigns sequential IDs to events. +2. Events stored in a persistent log (database, Kafka, Redis Streams, JetStream). +3. On reconnect with `Last-Event-ID`, server queries the log and replays. +4. Client is idempotent (GraphQL provides a deterministic view, but order matters for events). + +**Caveats**: +- HTTP/1.1 6-connection-per-domain limit. HTTP/2 resolves this (100 concurrent streams default). +- SSE is unidirectional (server → client). No client acks; `Last-Event-ID` is the only feedback. +- Events without `id` do not update `lastEventId` — server must always set IDs. + +--- + +## 7. How Cosmo Streams Works Today + +Based on the codebase at `/home/user/cosmo`: + +### Architecture + +``` +Client (WebSocket: graphql-transport-ws or subscriptions-transport-ws) + → Router WebSocket Handler (core/websocket.go) + → Protocol layer (internal/wsproto/) + → Operation planner → DataSource selection + → Provider (NATS / Kafka / Redis / Engine subgraph) + ↕ Events (batched) + → Hook pipeline (SubscriptionOnStart, OnReceiveEvents, OnPublishEvents) + → SubscriptionUpdater → write Next message to WebSocket +``` + +### Delivery Guarantees by Provider + +| Provider | Broker → Router | Router → Client | Notes | +|---|---|---|---| +| NATS JetStream | ✅ At-least-once | ❌ None | Manual `msg.Ack()` after delivery to subscriber (`adapter.go:154`) | +| NATS Core | ❌ At-most-once | ❌ None | Fire-and-forget pub/sub | +| Kafka | ❌ At-most-once | ❌ None | Polls from `time.Now()`, no offset commits | +| Redis | ❌ At-most-once | ❌ None | Pattern-based pub/sub | + +### Current NATS JetStream Consumer Configuration + +From `router/pkg/pubsub/nats/adapter.go`: +- Durable consumers: named `{durable_name}-{xxhash(hostname+listenAddr+subjects)}` +- `FetchNoWait(300)` — pull consumer, non-blocking batch fetch +- `msg.Ack()` called _after_ event is processed and dispatched to subscribers +- Consumer inactivity threshold for automatic cleanup +- `NatsStreamConfiguration` proto: `consumer_name`, `stream_name`, `consumer_inactive_threshold` + +### The Delivery Gap + +The critical gap is the **router → client** leg. When `msg.Ack()` is called in the NATS adapter, the message has been dispatched to the `SubscriptionUpdater`, but the WebSocket write to the client may not yet have occurred (or may fail). If the WebSocket write fails (connection drop), the JetStream message has already been acked — it is gone. + +Additionally, Kafka currently starts from the current offset (`time.Now().UnixMilli()`), so any events published during a client disconnect are permanently lost. + +### Existing Extension Points + +The `SubscriptionOnStart`, `StreamReceiveEventHandler`, and `StreamPublishEventHandler` hooks (Cosmo Streams v1) provide customization points. At-least-once implementations can leverage these hooks without core changes — but the hooks run _after_ the broker ack, so the gap remains at the transport layer. + +--- + +## 8. General At-Least-Once Implementation Primitives + +Regardless of transport, at-least-once delivery requires: + +| Component | Purpose | +|---|---| +| Message ID / Sequence number | Unique identity for each message; consumer tracks last processed ID | +| Server-side persistence | Messages stored until acked; survives server restarts | +| Visibility timeout / AckWait | Lock message to one consumer; redelivery if not acked in time | +| Exponential backoff + jitter | Space out retries; prevent thundering herd | +| Dead Letter Queue (DLQ) | Isolate poison messages after `maxRetries` | +| Idempotent consumer | At-least-once guarantees duplicates; consumer must handle them | +| Resume token | Client-side opaque cursor encoding last-received position; sent on reconnect | +| Session recovery | Server maps resume token to buffered or logged state | + +--- + +## 9. Key References + +- [Confluent: Message Delivery Guarantees for Apache Kafka](https://docs.confluent.io/kafka/design/delivery-semantics.html) +- [Apache Kafka: Producer Configs](https://kafka.apache.org/41/configuration/producer-configs/) +- [Confluent: Exactly-once Semantics are Possible](https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/) +- [NATS Docs: JetStream](https://docs.nats.io/nats-concepts/jetstream) +- [NATS Docs: Consumers](https://docs.nats.io/nats-concepts/jetstream/consumers) +- [NATS Docs: JetStream Model Deep Dive](https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive) +- [NATS Blog: Per-subject discard policy (infinite deduplication)](https://nats.io/blog/new-per-subject-discard-policy/) +- [Microsoft Learn: Create reliable WebSocket clients (Azure Web PubSub)](https://learn.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients) +- [Ably: WebSocket reliability in realtime](https://ably.com/topic/websocket-reliability-in-realtime-infrastructure) +- [Socket.IO: Delivery Guarantees](https://socket.io/docs/v4/delivery-guarantees) +- [HiveMQ: MQTT QoS Levels](https://www.hivemq.com/blog/mqtt-essentials-part-6-mqtt-quality-of-service-levels/) +- [STOMP Protocol Specification v1.2](https://stomp.github.io/stomp-specification-1.2.html) +- [graphql-ws PROTOCOL.md](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md) +- [graphql-sse PROTOCOL.md](https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md) +- [graphql-spec #419: Acknowledgement of messages in Subscription](https://github.com/graphql/graphql-spec/issues/419) +- [Platformatic: Resumable GraphQL Subscriptions](https://blog.platformatic.dev/resumable-graphql-subscriptions) +- [MDN: Using server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) +- [RFC 6202: Known Issues with Long Polling and Streaming](https://datatracker.ietf.org/doc/html/rfc6202) +- [Hasura #3517: Not all mutations delivered to subscribed client](https://github.com/hasura/graphql-engine/issues/3517) +- [AWS AppSync: Real-Time Data](https://docs.aws.amazon.com/appsync/latest/devguide/aws-appsync-real-time-data.html) diff --git a/rfcs/rfc-001-jetstream-cursor-resumption.md b/rfcs/rfc-001-jetstream-cursor-resumption.md new file mode 100644 index 0000000000..601d1e677c --- /dev/null +++ b/rfcs/rfc-001-jetstream-cursor-resumption.md @@ -0,0 +1,258 @@ +# RFC-001: JetStream Cursor-Based Subscription Resumption + +**Date:** 2026-05-05 +**Status:** Draft +**Author:** TBD +**Related:** [at-least-once-research.md](./at-least-once-research.md) + +--- + +## Abstract + +Expose JetStream stream sequence numbers as opaque _resume cursors_ in the GraphQL subscription protocol. When a client reconnects after a disconnect, it sends the last cursor it received. The router creates a new JetStream consumer starting from that sequence, replaying any missed messages before transitioning to live delivery. No external storage is required; JetStream itself is the replay log. + +--- + +## Motivation + +Today, when a WebSocket connection drops mid-subscription, the client re-subscribes and receives only events published _after_ the new connection is established. Events published during the outage are silently lost from the client's perspective. + +JetStream already persists every published event with a monotonic stream sequence number. The router already acks JetStream messages after dispatching them (`adapter.go:154`). The gap is narrow: we need to (a) surface the sequence number to the client as a cursor, and (b) honor that cursor on reconnect to reposition the JetStream consumer. + +This RFC proposes the minimum change required to close that gap for NATS JetStream-backed subscriptions. + +--- + +## Scope + +**In scope**: +- NATS JetStream-backed subscriptions (`@edfs__natsSubscribe` with `streamConfiguration`). +- `graphql-transport-ws` (graphql-ws) and `graphql-sse` transports. +- At-least-once delivery guarantee (duplicates may occur; clients must be idempotent). + +**Out of scope**: +- Kafka-backed subscriptions (addressed in RFC-003). +- Exactly-once semantics. +- Client library implementation specifics. + +--- + +## Design + +### 1. Cursor Format + +A _cursor_ is an opaque, URL-safe base64-encoded JSON object: + +```json +{ + "v": 1, + "s": "", + "seq": 42 +} +``` + +- `v` — version, for future evolution. +- `s` — JetStream stream name (needed to create a consumer on the right stream). +- `seq` — the stream sequence number of the last event delivered to the client. + +Clients treat this as an opaque string. The encoding is an implementation detail. + +### 2. Cursor Delivery to Client + +Every `Next` message carries the cursor in the `extensions` field of the `ExecutionResult`: + +```json +{ + "type": "next", + "id": "sub-1", + "payload": { + "data": { "employeeUpdates": { "id": 1 } }, + "extensions": { + "x-cosmo-cursor": "eyJ2IjoxLCJzIjoiZW1wbG95ZWVzIiwic2VxIjo0Mn0=" + } + } +} +``` + +The cursor is only present when the underlying event originates from a JetStream source. Events from non-JetStream sources (e.g., Redis, engine-based subscriptions) do not include a cursor, and clients cannot resume those subscriptions. + +### 3. Resuming a Subscription + +On reconnect, the client passes the last received cursor in the `ConnectionInit` payload: + +```json +{ + "type": "connection_init", + "payload": { + "x-cosmo-resume-cursor": "eyJ2IjoxLCJzIjoiZW1wbG95ZWVzIiwic2VxIjo0Mn0=" + } +} +``` + +The router decodes the cursor and stores the resume position for the connection. When a `Subscribe` message arrives for a JetStream-backed field, the router creates a JetStream pull consumer with: + +``` +DeliverByStartSequence: cursor.seq + 1 +``` + +This replays all events after the last delivered sequence, then continues with live events. + +For SSE, the cursor is passed as a custom HTTP header: + +``` +X-Cosmo-Resume-Cursor: eyJ2IjoxLCJzIjoiZW1wbG95ZWVzIiwic2VxIjo0Mn0= +``` + +Or as a query parameter: + +``` +GET /graphql/stream?x-cosmo-resume-cursor=eyJ... +``` + +### 4. Router Changes + +#### 4.1 NATS Adapter: Expose Sequence Numbers + +`router/pkg/pubsub/nats/adapter.go` — when fetching from JetStream, extract the stream sequence from the message metadata and attach it to the `MutableEvent`: + +```go +meta, err := msg.Metadata() +if err == nil { + event.SetHeader("x-nats-stream-seq", strconv.FormatUint(meta.Sequence.Stream, 10)) + event.SetHeader("x-nats-stream-name", meta.Stream) +} +``` + +#### 4.2 Subscription Updater: Attach Cursor to Extensions + +`router/pkg/pubsub/datasource/subscription_event_updater.go` — before calling `UpdateSubscription`, encode the cursor from event headers and inject it into the response extensions. + +#### 4.3 WebSocket Handler: Read Resume Cursor from `ConnectionInit` + +`router/core/websocket.go` — parse `x-cosmo-resume-cursor` from `ConnectionInit.payload`. Store the decoded cursor on the connection context. + +#### 4.4 DataSource Factory: Start Consumer at Cursor Position + +`router/pkg/pubsub/nats/engine_datasource.go` — when a cursor is present in the context, create a consumer starting at `cursor.seq + 1` instead of `DeliverNew`. + +### 5. Consumer Lifecycle + +Because cursor-based consumers are created per-reconnect, they should be **ephemeral** by default (no durable name, auto-deleted by JetStream when the client disconnects). This avoids consumer accumulation from many reconnects. + +If the subscription is expected to have very long disconnects (hours), the cursor consumer can optionally be made durable with a TTL driven by the stream's own retention window. + +### 6. Ack Timing Fix + +Currently `msg.Ack()` is called after dispatching to the `SubscriptionUpdater`, _before_ the WebSocket write completes. This means a network failure after ack but before write drops the message permanently. + +This RFC changes the ack timing: `msg.Ack()` is called only after the WebSocket write is confirmed (i.e., after `Flush()` returns without error). If the write fails, the message is _not_ acked, and JetStream redelivers it after `AckWait`. + +``` +Old: fetch → dispatch → ack → write +New: fetch → dispatch → write → ack (on write success) | nack (on write failure) +``` + +This introduces a small duplication window (if the write succeeds but the ack is lost in the network), which is acceptable for at-least-once semantics. + +### 7. Heartbeat and Cursor Consistency + +To keep the cursor up-to-date during periods of inactivity, the router periodically sends a `Ping` with the latest cursor in its payload. The client updates its stored cursor on receipt, even if no events have arrived. + +### 8. Client Responsibilities + +A conforming client must: +1. Store the most recent `x-cosmo-cursor` received for each active subscription. +2. On WebSocket reconnect, include the cursor in `ConnectionInit.payload`. +3. Handle duplicate events (same data may arrive twice due to the at-least-once window). Deduplication can use the sequence number embedded in the cursor or a domain-level ID in the event payload. + +A non-conforming client (no cursor support) receives the same behavior as today: live-only delivery, no replay. + +--- + +## Failure Modes and Edge Cases + +### JetStream Stream Retention + +If the client is offline longer than the stream's retention window, `cursor.seq + 1` may no longer exist. JetStream will start the consumer from the earliest available sequence, potentially replaying a large number of events. The router signals this in the first resumed `Next` message via an extension field: + +```json +"extensions": { + "x-cosmo-cursor": "...", + "x-cosmo-cursor-gap": true +} +``` + +The client can use this flag to indicate to the user that a gap occurred. + +### Router Restart Between Disconnect and Reconnect + +The cursor is entirely client-side. A router restart has no effect — the new router instance decodes the cursor and creates a fresh ephemeral consumer at the specified sequence. + +### Multiple Router Instances + +Because the cursor encodes the stream name and sequence (not a router-local state), any router instance can serve a reconnect. Load balancer stickiness is _not_ required. + +### Clock Skew / Sequence Regression + +JetStream stream sequences are monotonically increasing and assigned by the NATS server, not the router. There is no clock skew risk. If a cursor arrives with a sequence higher than the current stream head (which should be impossible in normal operation), the consumer is started at the stream head and `x-cosmo-cursor-gap: true` is set. + +### Subscription Field Mismatch on Reconnect + +If the client reconnects with a cursor but sends a different subscription query or variables, the cursor is silently ignored and the subscription starts at `DeliverNew`. + +--- + +## Backward Compatibility + +- Clients that do not send a cursor receive no change in behavior (live-only delivery). +- Clients that do not read the `extensions.x-cosmo-cursor` field are unaffected. +- No breaking changes to the `graphql-transport-ws` or `graphql-sse` protocols; the cursor uses the existing `extensions` and payload extension points. + +--- + +## Tradeoffs + +| Factor | Assessment | +|---|---| +| Complexity | Low: leverages existing JetStream durability. No new storage. | +| Router statefulness | None: cursor is client-held. Any router instance can serve any reconnect. | +| JetStream-only | Kafka, Redis, and engine subscriptions are not covered. | +| Duplicate handling | Required: at-least-once guarantees duplicates. Client must be idempotent. | +| Ack timing | Minor change with measurable duplication window during write failures. | +| Stream retention dependency | If the client is offline longer than retention, gaps occur. | +| Client changes required | Minimal: read and re-send cursor; handle duplicates. | +| GraphQL ecosystem compatibility | Works with Apollo Client, urql, and others via `extensions` field. | + +--- + +## Alternatives Considered + +**Use durable consumers per client**: creates O(clients) durable consumers, straining JetStream. Ephemeral consumers per reconnect are preferred. + +**Encode cursor in subscription `id`**: the `id` is used for multiplexing and must be client-controlled. Overloading it would break existing protocol semantics. + +**Store cursor server-side (session store)**: requires shared state between router instances. Addressed in RFC-003. + +--- + +## Open Questions + +1. Should cursor validity be validated against the stream's known sequence range before starting the consumer, or fail-open (gap + `x-cosmo-cursor-gap`)? +2. What is the recommended `AckWait` value when cursor-based consumers are in use? The current default of 30s may be too short for slow clients. +3. Should the cursor include a signature/HMAC to prevent clients from forging arbitrary sequence positions? (Security vs. simplicity tradeoff.) +4. Should `x-cosmo-cursor` be opt-in (disabled by default, enabled per-subscription or per-connection) or opt-out? +5. How should this interact with the `StreamReceiveEventHandler` hook, which can filter or modify events? Replayed events will also pass through the hook. + +--- + +## Implementation Plan + +1. Add `SetHeader` / `GetHeader` to `MutableStreamEvent` / `StreamEvent` interfaces (if not already present). +2. Modify NATS adapter to attach stream sequence and stream name as event headers. +3. Add cursor encoding/decoding utilities to `router/pkg/pubsub/`. +4. Modify `subscription_event_updater.go` to inject cursor into response extensions. +5. Modify `websocket.go` to parse and store resume cursor from `ConnectionInit`. +6. Modify NATS `engine_datasource.go` to honor resume cursor when creating consumers. +7. Change ack timing in `adapter.go` to post-write. +8. Add integration tests: reconnect with cursor, gap detection, multi-router reconnect. +9. Document the cursor protocol for client implementors. diff --git a/rfcs/rfc-002-sse-last-event-id.md b/rfcs/rfc-002-sse-last-event-id.md new file mode 100644 index 0000000000..eb4c8e1167 --- /dev/null +++ b/rfcs/rfc-002-sse-last-event-id.md @@ -0,0 +1,322 @@ +# RFC-002: SSE Transport with Last-Event-ID Replay + +**Date:** 2026-05-05 +**Status:** Draft +**Author:** TBD +**Related:** [at-least-once-research.md](./at-least-once-research.md) + +--- + +## Abstract + +Add a Server-Sent Events (SSE) subscription transport to Cosmo that uses the browser's native `Last-Event-ID` reconnection mechanism for at-least-once delivery. Each event is assigned a sequential ID derived from the underlying broker's sequence number. On reconnect, the browser automatically sends `Last-Event-ID` and the router replays all events after that position from the broker's durable log. No client library changes are required beyond switching from WebSocket to `EventSource`. + +--- + +## Motivation + +The `graphql-transport-ws` protocol has no built-in replay mechanism. Any at-least-once guarantee over WebSockets requires application-level session tracking, which either requires shared router state (complex, stateful) or cursor management in the client (requires client library changes). + +SSE solves this at the HTTP level. The HTML living standard defines `Last-Event-ID` as a first-class mechanism for exactly this use case: the browser automatically retains the last received event ID across reconnects and sends it in the `Last-Event-ID` request header. The server does not need to track per-client session state — it only needs to replay events from the log. + +This RFC proposes implementing a new `graphql-sse`-compatible endpoint that: +- Uses `text/event-stream` as the content type. +- Sets the SSE `id:` field on every event from its broker sequence number. +- Replays events from JetStream or Kafka on reconnect based on `Last-Event-ID`. +- Works natively with `EventSource` (no custom client library required for basic use). + +--- + +## Scope + +**In scope**: +- New SSE subscription endpoint alongside the existing WebSocket endpoint. +- At-least-once delivery for JetStream-backed and Kafka-backed subscriptions. +- Native `EventSource` API compatibility (no required client library). +- `graphql-sse` protocol compatibility (enables existing `graphql-sse` clients to benefit). + +**Out of scope**: +- Bidirectional acks (SSE is server-to-client only). +- Exactly-once semantics. +- Mutations or queries over SSE. +- Redis-backed subscriptions (no durable log). + +--- + +## Background: SSE Reconnection Semantics + +``` +id: 42 +event: next +data: {"id":"sub-1","payload":{"data":{"employeeUpdates":{"id":1}}}} +retry: 3000 +``` + +When the EventSource connection drops: +1. The browser retains `lastEventId = 42`. +2. After `retry` milliseconds, the browser reconnects to the same URL. +3. The reconnect request includes `Last-Event-ID: 42`. +4. The server sees this header and replays events with id > 42 before resuming live delivery. + +This is entirely automatic — the browser handles step 1–3 with zero client code. The server only needs to implement step 4. + +--- + +## Design + +### 1. Endpoint + +New endpoint added to the Cosmo router: + +``` +GET /graphql/stream +``` + +Or alternatively, the existing `/graphql` endpoint is extended to return `text/event-stream` when: +- The request carries `Accept: text/event-stream`, or +- A `?stream=true` query parameter is present. + +The subscription query, variables, and operation name are passed as query parameters: + +``` +GET /graphql/stream?query=subscription%7BemployeeUpdates%7Bid%7D%7D +``` + +Or via a prior `POST` that returns a subscription ID (Single Connection Mode as defined by `graphql-sse`): + +``` +POST /graphql/stream → { "id": "sub-abc" } +GET /graphql/stream/sub-abc +``` + +For simplicity, this RFC focuses on the single-subscription distinct-connections mode. Multi-subscription single-connection mode can be added later. + +### 2. Event Format + +Events follow the `graphql-sse` protocol format: + +``` +id: +event: next +data: + +``` + +``` +id: +event: complete +data: + +``` + +``` +id: 0 +event: error +data: + +``` + +The `id` field is the broker sequence number: +- JetStream: `meta.Sequence.Stream` +- Kafka: `:` (e.g., `0:1042`) + +Heartbeat (keep-alive) events keep the connection open and update `lastEventId`: + +``` +id: +: keep-alive + +``` + +(A comment-only event — `:`-prefixed — is ignored by the EventSource parser but resets the connection timer and keeps `lastEventId` current.) + +### 3. Reconnection and Replay + +On an incoming request with `Last-Event-ID: `: + +**JetStream**: +1. Parse cursor as a stream sequence number. +2. Create an ephemeral pull consumer with `DeliverByStartSequence: cursor + 1`. +3. Drain buffered events (those with seq ≤ cursor) through the normal hook pipeline. +4. Once caught up (pending = 0), transition to live delivery. + +**Kafka**: +1. Parse cursor as `:`. +2. Create a Kafka consumer for the topic, calling `consumer.Seek(partition, offset + 1)`. +3. Drain and transition to live delivery as above. + +During the replay phase, the router marks the stream as "catch-up mode". The `StreamReceiveEventHandler` hook fires for replayed events just as it does for live events — so filtering, transformation, and authorization all apply consistently. + +### 4. Cursor Encoding + +For JetStream, the SSE `id` is the raw uint64 stream sequence number (decimal string): `"42"`. + +For Kafka, where messages are identified by partition + offset, the cursor is `":"`: `"0:1042"`. If the subscription covers multiple partitions, a cursor encodes the minimum offset per partition that the client has confirmed: `"0:100,1:55,2:220"`. + +### 5. Router Changes + +#### 5.1 New SSE Handler + +`router/core/sse.go` (new file) — implements `http.Handler`: + +```go +func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("X-Accel-Buffering", "no") // disable nginx buffering + flusher := w.(http.Flusher) + + lastEventID := r.Header.Get("Last-Event-ID") + // ... parse query, plan subscription, create consumer at lastEventID+1 + // ... stream events as SSE frames +} +``` + +#### 5.2 NATS Adapter: Replay Mode + +`router/pkg/pubsub/nats/adapter.go` — new `SubscribeFromSequence(ctx, seq, handler)` method that creates an ephemeral consumer at the given sequence. + +#### 5.3 Kafka Adapter: Seek Support + +`router/pkg/pubsub/kafka/adapter.go` — new `SubscribeFromOffset(ctx, partition, offset, handler)` method that creates a consumer group client and seeks to the specified offset. + +#### 5.4 Flush Strategy + +SSE requires immediate flushing after each event for low-latency delivery. The SSE handler calls `flusher.Flush()` after every event write. Batching multiple events (when the JetStream consumer returns multiple messages in one `Fetch()` call) can be done by writing all events first, then calling `Flush()` once per batch. + +### 6. No Client Acks Required + +SSE is unidirectional: the `Last-Event-ID` mechanism is the only feedback channel. This means: +- The server cannot distinguish "client received and processed" from "client received". +- The server acks the JetStream/Kafka message after writing the event to the SSE stream (after `Flush()` succeeds), not after client application processing. +- This is still a substantial improvement over the current behavior (no ack at all, events lost on disconnect). + +For applications that require application-level acknowledgment, clients should send a `POST /graphql` mutation with the event ID (see RFC-004 for a schema-level approach). + +### 7. HTTP/2 and Multiplexing + +Under HTTP/1.1, browsers allow only 6 connections per origin. Each `EventSource` consumes one connection. For applications with multiple subscriptions from a single page, HTTP/2 is strongly recommended — it allows up to 100 concurrent streams (configurable) over a single TCP connection. + +The router should explicitly document this constraint and recommend HTTP/2 in production. + +### 8. CORS and Proxy Considerations + +SSE with `Last-Event-ID` requires CORS pre-flight if the request is cross-origin. The router must include `Last-Event-ID` in the `Access-Control-Allow-Headers` response header. + +CDNs and reverse proxies often buffer SSE streams. The router must set: +- `X-Accel-Buffering: no` (nginx) +- `Cache-Control: no-cache` +- `Connection: keep-alive` + +### 9. Client Integration + +**Native EventSource (minimal)**: + +```js +const source = new EventSource( + `/graphql/stream?query=${encodeURIComponent('subscription { employeeUpdates { id } }')}` +); +source.addEventListener('next', (event) => { + const result = JSON.parse(event.data); + console.log(result.payload.data); +}); +source.addEventListener('error', (event) => { + // Browser automatically reconnects with Last-Event-ID +}); +``` + +The browser handles reconnection and `Last-Event-ID` automatically. Zero extra client code for at-least-once delivery. + +**graphql-sse client library**: + +```js +import { createClient } from 'graphql-sse'; +const client = createClient({ url: '/graphql/stream' }); +// graphql-sse handles SSE under the hood; same API as today +``` + +`graphql-sse` will automatically pass `Last-Event-ID` on reconnect because it uses the native `EventSource` (or a polyfill that does the same). + +**Apollo Client / urql via adapter**: + +Both Apollo Client and urql support custom link/exchange implementations. An SSE link/exchange wrapping the native `EventSource` or `graphql-sse` client would be a small community package. It does not require changes to Apollo Client or urql core. + +--- + +## Failure Modes and Edge Cases + +### Broker Retention Gap + +If the client has been offline longer than the broker's retention window, the cursor sequence is no longer available. The router detects this and: +- For JetStream: if `DeliverByStartSequence` points to a deleted sequence, JetStream starts from the earliest available. The router signals the gap via an `event: gap` SSE event before the first replayed `next`. +- For Kafka: `seek()` to a deleted offset falls back to `auto.offset.reset=earliest`. Same gap signaling. + +### Large Replay Backlog + +If a client reconnects after a long outage, there may be thousands of events to replay. The router should: +1. Start streaming replayed events immediately (do not buffer all of them in memory first). +2. Apply backpressure: use JetStream pull with `FetchNoWait(100)` in a loop; write and flush each batch before fetching the next. +3. The client's `EventSource` will buffer incoming events in order; no special client handling needed. + +### Duplicate Events During Reconnect + +The last event before disconnect may have been written to the SSE stream but `flusher.Flush()` may have failed (or the TCP connection may have been severed mid-frame). On reconnect with the previous `Last-Event-ID`, the router will replay that event. Clients must be idempotent with respect to event IDs. + +### Multiple Router Instances + +SSE reconnection can hit any router instance (no sticky sessions required). The cursor is broker-native (stream sequence or partition:offset) and is interpreted independently by each router instance. This works correctly as long as the underlying broker (NATS server cluster or Kafka broker cluster) is shared. + +### Connection Timeout at Load Balancer + +Many load balancers have default HTTP timeouts of 60–300 seconds. SSE connections are long-lived. The router should: +- Send a keep-alive comment (`:\n\n`) every 15–30 seconds. +- Document that load balancers need to have their timeout configured (or disabled) for the SSE endpoint. + +--- + +## Backward Compatibility + +- The existing WebSocket endpoint is unchanged. +- The SSE endpoint is additive; no existing functionality is modified. +- Clients using `graphql-sse` today (which already exist in the ecosystem) can be pointed to the new endpoint with at-least-once delivery automatically. + +--- + +## Tradeoffs + +| Factor | Assessment | +|---|---| +| Client changes required | Minimal: switch from WebSocket to `EventSource`. No library changes needed. | +| Browser native support | `EventSource` is supported in all major browsers. Automatic reconnection. | +| Unidirectionality | No client-to-server acks over the same connection. Application acks require a separate POST. | +| HTTP/1.1 connection limit | 6-per-origin limit. HTTP/2 required for multiple concurrent subscriptions. | +| Kafka support | Requires Kafka consumer seek + consumer group management (RFC-001 does not cover Kafka). | +| Proxy/CDN buffering | Requires specific proxy configuration; documented but still a deployment concern. | +| Replay backlog | Large backlogs can cause slow catch-up; need streaming replay, not in-memory buffering. | +| JetStream + Kafka parity | Cursor encoding differs between providers; clients need to handle opaque cursors. | +| Router statefulness | None: no per-client server state needed. | + +--- + +## Comparison with RFC-001 + +| | RFC-001 (WS + cursor) | RFC-002 (SSE) | +|---|---|---| +| Transport | WebSocket (existing) | SSE (new endpoint) | +| Kafka support | No | Yes | +| Client changes | Read cursor from extensions; send on reconnect | Switch to `EventSource`; browser handles rest | +| Browser native reconnect | No | Yes | +| Client acks | No | No (both are at-least-once; acks need RFC-004) | +| Router state | None | None | +| Multiple subscriptions per connection | Yes (WS multiplexing) | HTTP/2 stream multiplexing | + +--- + +## Open Questions + +1. Should the SSE endpoint be `/graphql/stream` (separate URL) or `/graphql` with `Accept: text/event-stream` content negotiation? +2. Should multi-subscription single-connection mode (token-based) be included in v1 or deferred? +3. How should the Kafka cursor be structured for multi-partition subscriptions? A compact encoding or a base64 JSON blob? +4. Should keep-alive events update `lastEventId`? (They must, to keep the cursor fresh even during quiet periods.) +5. What is the maximum replay window? Should the router enforce a maximum number of replayed events per reconnect to prevent abuse? +6. Should the SSE endpoint require authentication via a query parameter (for environments where cookies are not available)? diff --git a/rfcs/rfc-003-router-session-buffer.md b/rfcs/rfc-003-router-session-buffer.md new file mode 100644 index 0000000000..906aa49e13 --- /dev/null +++ b/rfcs/rfc-003-router-session-buffer.md @@ -0,0 +1,354 @@ +# RFC-003: Router-Managed Session Buffer + +**Date:** 2026-05-05 +**Status:** Draft +**Author:** TBD +**Related:** [at-least-once-research.md](./at-least-once-research.md) + +--- + +## Abstract + +The router maintains a per-client session buffer in shared storage (JetStream Key/Value or Redis). When events are delivered from a broker, the router writes them to the buffer before acking the broker message. The client receives events from this buffer. On reconnect, the client sends a session token; the router replays unacknowledged events from the buffer. This approach works with _all_ backend providers (JetStream, Kafka, Redis) and requires no client protocol changes beyond passing a session token in `ConnectionInit`. + +--- + +## Motivation + +RFC-001 and RFC-002 require either JetStream or Kafka as the underlying broker (each of which provides a replayable log). Subscriptions backed by Redis pub/sub, or by engine-based GraphQL subscriptions (subgraph WebSocket), have no durable log and cannot be replayed from the source. + +Additionally, both RFC-001 and RFC-002 require at least minor client library changes (reading a cursor extension or switching to SSE). This RFC is designed so that the only client change is passing a session token in the `ConnectionInit` payload — a change that can be added transparently to any GraphQL client using the existing `connectionParams` mechanism. + +--- + +## Scope + +**In scope**: +- At-least-once delivery for all provider types (JetStream, Kafka, Redis, engine subscriptions). +- `graphql-transport-ws` (graphql-ws) protocol. +- Client reconnection with session token replay. + +**Out of scope**: +- Exactly-once semantics (still requires client-side deduplication). +- SSE transport (see RFC-002). +- Cross-datacenter replication of the session buffer. + +--- + +## Design + +### 1. Session Lifecycle + +1. **Client connects** for the first time (no session token). Router generates a `sessionId` (UUID v7, time-ordered) and returns it in `ConnectionAck.payload`: + +```json +{ + "type": "connection_ack", + "payload": { + "x-cosmo-session-id": "01926d8e-6b3a-7e0a-b1c2-3d4e5f6a7b8c" + } +} +``` + +2. **Events are delivered** to the client. Before each event is written to the WebSocket, the router writes it to the session buffer with a monotonic sequence number. The event is marked `pending` in the buffer. + +3. **Client sends an ack** (see section 4) for a sequence number. The router marks events ≤ that sequence as `acknowledged` in the buffer. Acknowledged events are eligible for GC. + +4. **WebSocket disconnects** (client or network). Pending (unacknowledged) events remain in the buffer. The buffer has a TTL (default: 5 minutes, configurable). + +5. **Client reconnects** with the session token in `ConnectionInit`: + +```json +{ + "type": "connection_init", + "payload": { + "x-cosmo-session-id": "01926d8e-6b3a-7e0a-b1c2-3d4e5f6a7b8c" + } +} +``` + +6. Router looks up the session buffer. All pending events (in order) are replayed to the client over the new WebSocket connection. New events continue flowing after the replay is complete. + +7. **Session expires** (TTL exceeded, or client sends explicit `ConnectionComplete`). The buffer is deleted. + +### 2. Buffer Storage Options + +#### Option A: JetStream Key/Value + +Use a JetStream KV bucket as the session buffer. Each key is `session:{sessionId}:{seq}`, value is the serialized event. + +Pros: +- Already required by Cosmo for JetStream subscriptions. +- RAFT-replicated; survives individual NATS node failures. +- TTL natively supported via bucket `TTL` config. +- No additional infrastructure. + +Cons: +- Additional load on the NATS cluster. +- KV is optimized for individual value access, not range scans. Listing `session:{sessionId}:*` keys requires a watcher, which is efficient but adds subscription overhead. + +#### Option B: JetStream Stream per Session + +Each session gets its own ephemeral JetStream stream (`cosmo-session-{sessionId}`) with a retention policy and TTL. Events are published to the stream. On reconnect, a consumer is created from the last acknowledged sequence. + +Pros: +- Clean semantics: the session buffer is itself a JetStream stream with all its reliability properties. +- Replay is exactly the JetStream consumer pattern from RFC-001, applied to a session stream rather than the source stream. +- Natural backpressure and flow control. + +Cons: +- Creates O(active-sessions) streams. JetStream has a practical limit on concurrent streams (~100k depending on hardware). For large deployments, this may be a bottleneck. +- Creating and deleting streams is more expensive than KV operations. + +#### Option C: Redis + +Use a Redis sorted set per session. Key: `cosmo:session:{sessionId}`, members: serialized events, scores: sequence numbers. + +Pros: +- Redis is already a Cosmo provider (pub/sub). +- Sorted sets support range queries (`ZRANGEBYSCORE`) for efficient replay. +- `EXPIRE` for TTL. +- Widely understood by operators. + +Cons: +- Redis is a separate infrastructure dependency (beyond NATS). +- Redis pub/sub is in-memory; durability depends on AOF/RDB config. +- If Redis is the source provider for the subscription, storing the buffer in the same Redis adds contention. + +**Recommendation**: Option B (JetStream Stream per Session) for deployments that already use JetStream as a provider. Option C (Redis) as an alternative for deployments that do not use JetStream. + +### 3. Event Sequencing + +The router assigns monotonic sequence numbers to events within a session. These are independent of the broker's sequence numbers. The session sequence: +- Starts at 1 for each new session. +- Increments by 1 for each event delivered (or buffered, regardless of whether delivery succeeds). +- Is included in every `Next` message extension: + +```json +{ + "type": "next", + "id": "sub-1", + "payload": { + "data": { "employeeUpdates": { "id": 1 } }, + "extensions": { + "x-cosmo-seq": 7 + } + } +} +``` + +### 4. Client Acknowledgment + +#### Option A: Cumulative Ack via `ConnectionInit`-Extension Message + +The client sends a dedicated ack message over the existing WebSocket connection (a custom message type): + +```json +{ + "type": "x-cosmo-ack", + "payload": { "seq": 7 } +} +``` + +This acks all events ≤ seq 7. The server removes them from the session buffer (marks for GC). + +The `x-cosmo-ack` message type is an extension to the `graphql-transport-ws` protocol. Servers that do not understand it must ignore it; clients that send it to non-supporting servers will see no effect. + +#### Option B: Implicit Ack by Reconnection + +When a client reconnects with a `sessionId` and provides a `lastSeq`, events ≤ `lastSeq` are implicitly acked: + +```json +{ + "type": "connection_init", + "payload": { + "x-cosmo-session-id": "...", + "x-cosmo-last-seq": 7 + } +} +``` + +No explicit ack message is needed. The tradeoff: events are only acked when the client reconnects, so the buffer may hold events longer than necessary. + +#### Option C: Periodic Ack Extension + +The client sends acks periodically (e.g., every 5 seconds or every 10 events), encoded as a heartbeat: + +```json +{ + "type": "ping", + "payload": { "x-cosmo-ack-seq": 7 } +} +``` + +This reuses the existing `Ping` message type's optional payload to avoid adding a new message type. + +**Recommendation**: Option A (explicit ack message) for maximum flexibility. Option B as a lower-effort alternative for clients that prefer not to send acks. + +### 5. Buffer Write Ordering + +The critical ordering is: + +``` +Broker message received + → Write event to session buffer (persist) + → Write event to WebSocket (deliver) + → On WebSocket success: mark buffer event as pending-ack-from-client + → On WebSocket failure: event remains in buffer, will be replayed on reconnect + → On client ack: mark buffer event as acknowledged + → GC acknowledged events +``` + +The broker message (JetStream or Kafka) must only be acked _after_ the event is successfully written to the session buffer. This ensures that if the router crashes before writing to the buffer, the broker will redeliver. + +``` +Old: broker → ack broker → write WebSocket +New: broker → write buffer → ack broker → write WebSocket +``` + +If writing to the buffer fails, the router should _not_ ack the broker message. JetStream will redeliver after `AckWait`. The router should log and potentially circuit-break if the buffer is consistently unavailable. + +### 6. Cross-Router Replay + +Because the session buffer is stored in shared infrastructure (JetStream or Redis), any router instance can serve a reconnect. The `sessionId` is the only key needed to locate the buffer. + +On reconnect: +1. Router A was serving the session; client reconnects to Router B. +2. Router B reads the `sessionId` from `ConnectionInit`. +3. Router B looks up the session buffer by key. +4. Router B replays pending events. +5. Router B re-establishes the live subscription from the broker (NATS/Kafka) from the position after the last buffered event. + +Step 5 requires knowing where to resume the live subscription. The session buffer entry for each event must include the broker cursor (JetStream sequence or Kafka partition:offset) alongside the session sequence number. The router uses the maximum broker cursor in the buffer to create a new consumer at `brokerCursor + 1`. + +### 7. Buffer Size Limits and Backpressure + +The session buffer must be bounded to prevent runaway memory/storage growth. + +**Hard limit**: maximum N events in the buffer (default: 1000). If the buffer is full and a new event arrives, the router closes the WebSocket connection with a `4400 Session buffer full` close code. The client must reconnect; the router creates a new session (old session is discarded) and starts fresh. The client observes an event gap. + +**Soft limit**: when the buffer reaches 80% capacity, the router sends a `Ping` with `{"x-cosmo-buffer-pressure": 0.8}`. Clients that consume acks quickly will drain the buffer; clients that don't will hit the hard limit. + +### 8. Session Expiry + +Sessions expire after the TTL (default: 5 minutes from last disconnect). On expiry: +- The session buffer is deleted. +- If a client reconnects with an expired `sessionId`, the router creates a new session (no replay) and includes `"x-cosmo-session-expired": true` in `ConnectionAck`. + +The TTL is configurable per-router: +```yaml +subscriptions: + session_buffer: + enabled: true + ttl: 5m + max_events: 1000 + storage: jetstream # or redis +``` + +### 9. Client Integration + +**Minimal client change** (any framework, any language): + +```js +// Apollo Client +const wsLink = new GraphQLWsLink(createClient({ + url: 'ws://router/graphql', + connectionParams: () => { + const sessionId = sessionStorage.getItem('cosmo-session-id'); + return sessionId ? { 'x-cosmo-session-id': sessionId } : {}; + }, + on: { + connected: (socket, payload) => { + if (payload?.['x-cosmo-session-id']) { + sessionStorage.setItem('cosmo-session-id', payload['x-cosmo-session-id']); + } + }, + }, +})); +``` + +```js +// urql +import { createClient } from '@urql/core'; +import { subscriptionExchange } from '@urql/core'; +// same pattern: store session ID, pass on reconnect +``` + +Ack messages (Option A) require a slightly more involved client integration, but can be handled transparently in the `graphql-ws` client library by monkey-patching or wrapping the `next` callback. A thin wrapper package (`@wundergraph/cosmo-ws-client`) can handle this. + +--- + +## Failure Modes and Edge Cases + +### Buffer Storage Unavailable + +If the session buffer storage (JetStream or Redis) is unavailable when an event arrives: +- The router falls back to _best-effort delivery_ (deliver without buffering, do not ack broker). +- Alert/metric emitted: `cosmo_session_buffer_unavailable_total`. +- JetStream will redeliver the broker message after `AckWait`. When the buffer recovers, delivery resumes. + +### Router Crash Mid-Buffer-Write + +If the router crashes after writing the event to the session buffer but before acking the broker: +- The broker redelivers the message to another router instance (or the same, after restart). +- The duplicate event is written to the buffer with a new session sequence number. +- The client sees a duplicate. Client deduplication via `x-cosmo-seq` (if the event was already in the buffer with the same broker sequence, the router deduplicates before writing). + +### Client Sends Ack for Unknown Sequence + +The router ignores acks for sequences it has already GC'd or that are out of range. No error is returned. + +### Multiple Active Connections with Same Session + +If a client opens two WebSocket connections with the same `sessionId` (browser tab duplication), both connections receive events. The buffer is written once per event; acking from one connection acks for all. This is acceptable behavior. If strict isolation is needed, the `sessionId` should be treated as single-use (invalidated when a new connection uses it). + +--- + +## Backward Compatibility + +- Clients that do not send a `sessionId` receive the existing at-most-once behavior. +- `ConnectionAck.payload` is extended with `x-cosmo-session-id`. Clients that ignore `ConnectionAck.payload` are unaffected. +- The `x-cosmo-ack` message type is additive; servers that do not support it ignore it. + +--- + +## Tradeoffs + +| Factor | Assessment | +|---|---| +| Provider coverage | All providers (JetStream, Kafka, Redis, engine). | +| Router statefulness | Session buffer requires shared storage. Added operational dependency. | +| Client changes | Minimal: store and pass session token. No library changes for basic use. | +| Buffer storage cost | O(unacked events × active disconnected sessions). Bounded by TTL and max_events. | +| Cross-router reconnect | Works: session buffer is in shared storage. | +| Duplicate risk | Low: broker cursor deduplication prevents duplicate buffer writes. | +| Complexity | Highest of all RFCs: buffer management, GC, cross-router replay, backpressure. | +| GraphQL ecosystem fit | Works with `graphql-ws` via `connectionParams` (Apollo Client, urql, gql-ws). | + +--- + +## Comparison with Other RFCs + +| | RFC-001 (WS cursor) | RFC-002 (SSE) | RFC-003 (session buffer) | +|---|---|---|---| +| Provider coverage | JetStream only | JetStream + Kafka | All providers | +| Router state | None | None | Shared storage required | +| Client changes | Cursor in extensions | Switch to EventSource | Session token in connectionParams | +| Kafka support | No | Yes | Yes | +| Redis support | No | No | Yes | +| Engine sub support | No | No | Yes | +| Acks to router | No | No | Yes (optional) | +| Broker ack timing | Post-write | Post-flush | Post-buffer-write | +| Buffer TTL dependency | No | No | Yes (events lost after TTL) | +| Infrastructure | NATS only | NATS or Kafka | NATS or Kafka + buffer store | + +--- + +## Open Questions + +1. Should the session buffer be enabled by default (opt-out) or disabled by default (opt-in)? Default-on improves reliability for all users but adds storage overhead. +2. Should acks from the client be required, or should the router use implicit acks (events are acked once the WebSocket write is confirmed, treating disconnects as the trigger for replay)? The latter simplifies the client but is more conservative about storage. +3. What is the right default TTL? 5 minutes covers most mobile network reconnects. Long-haul reconnects (hours) would require increasing this or combining with RFC-001/RFC-002's broker-native replay. +4. Should the session buffer be per-subscription or per-connection? Per-connection is simpler; per-subscription allows different TTLs for different subscriptions. +5. How should the buffer storage backend be selected in multi-provider setups (some subscriptions use JetStream, some use Kafka)? +6. Should the `x-cosmo-seq` extension be stable across reconnects (i.e., continuous sequence across sessions) or reset per-session? diff --git a/rfcs/rfc-004-schema-delivery-directive.md b/rfcs/rfc-004-schema-delivery-directive.md new file mode 100644 index 0000000000..2a4b7a125b --- /dev/null +++ b/rfcs/rfc-004-schema-delivery-directive.md @@ -0,0 +1,416 @@ +# RFC-004: Schema-Level Delivery Guarantee Directive + +**Date:** 2026-05-05 +**Status:** Draft +**Author:** TBD +**Related:** [at-least-once-research.md](./at-least-once-research.md) + +--- + +## Abstract + +Introduce a `@stream` directive on GraphQL subscription fields that enables schema-first declaration of delivery guarantees. When applied, the router automatically wraps subscription responses in an envelope containing a cursor and event ID. Clients resume subscriptions by passing `afterCursor` as a subscription argument. An explicit `acknowledgeEvent` mutation closes the delivery loop. This approach is fully introspectable, works over any transport (WebSocket, SSE), and integrates cleanly with the existing GraphQL client ecosystem via standard variables and mutations. + +--- + +## Motivation + +The prior RFCs (001–003) add delivery guarantees at the transport or router layer, invisible in the schema. This creates a discoverability problem: developers cannot tell from the schema alone whether a subscription provides delivery guarantees. Client libraries need out-of-band documentation to know whether to send cursors, session tokens, or use SSE. + +This RFC takes a schema-first approach: delivery guarantees are part of the contract between API provider and consumer, visible in introspection, and described by schema documentation. The cursor-based resumption pattern (described in the research document) is a well-understood pattern in GraphQL already (Relay-style pagination cursors). + +--- + +## Scope + +**In scope**: +- New `@stream` directive (configurable name) for subscription fields in the event-driven subgraph schema. +- Automatic cursor/envelope injection by the router on decorated subscriptions. +- `afterCursor` argument automatically added to decorated subscription fields. +- `acknowledgeEvent` mutation (optional, for application-level acks). +- Works over `graphql-transport-ws` and SSE transports. +- Compatible with all backend providers (JetStream, Kafka, Redis). + +**Out of scope**: +- Exactly-once semantics. +- Non-subscription operations. +- Enforcement of idempotency at the schema level. + +--- + +## Design + +### 1. The `@stream` Directive + +The directive is added to the Cosmo event-driven subgraph SDL: + +```graphql +directive @stream( + guarantee: DeliveryGuarantee = AT_LEAST_ONCE + bufferTTL: Int = 300 # seconds; 0 = no buffering + maxReplay: Int = 10000 # max events to replay on cursor resume +) on FIELD_DEFINITION + +enum DeliveryGuarantee { + AT_MOST_ONCE # default behavior (no change) + AT_LEAST_ONCE # cursor + replay +} +``` + +Example usage in a subgraph schema: + +```graphql +type Subscription { + employeeUpdates: Employee! + @edfs__natsSubscribe(subjects: ["employeeUpdates"], providerId: "my-nats") + @stream(guarantee: AT_LEAST_ONCE, bufferTTL: 600) + + orderCreated: Order! + @edfs__kafkaSubscribe(topics: ["orders"], providerId: "my-kafka") + @stream(guarantee: AT_LEAST_ONCE) +} +``` + +### 2. Automatic Schema Transformation + +When the router compiles the federated schema, it detects `@stream(guarantee: AT_LEAST_ONCE)` on subscription fields and performs the following transformations: + +#### 2.1 Add `afterCursor` argument + +```graphql +# Original +type Subscription { + employeeUpdates: Employee! @stream(guarantee: AT_LEAST_ONCE) +} + +# After transformation +type Subscription { + employeeUpdates(afterCursor: String): EmployeeStreamEvent! +} +``` + +The `afterCursor` argument is optional. When absent, delivery starts from "now" (current behavior). When present, delivery starts from the event after the given cursor. + +#### 2.2 Wrap response in a stream envelope type + +```graphql +type EmployeeStreamEvent { + cursor: String! # opaque, encodes broker position + eventId: String! # unique event ID (deduplication key) + deliveredAt: String! # ISO 8601 timestamp + sequenceNumber: Int! # monotonic per-subscription sequence + data: Employee! # the original payload +} +``` + +The envelope is generated automatically per subscription field. The naming follows a configurable convention: `{FieldName}StreamEvent`. + +Clients destructure: +```graphql +subscription { + employeeUpdates(afterCursor: $cursor) { + cursor + eventId + data { + id + name + } + } +} +``` + +After each event the client stores `cursor` and passes it as `$cursor` on the next connection. + +### 3. Cursor Semantics + +The `cursor` value is an opaque, base64url-encoded JSON structure: + +```json +{ + "v": 1, + "provider": "nats-jetstream", + "stream": "employees", + "seq": 42, + "ts": 1746000000000 +} +``` + +For Kafka: +```json +{ + "v": 1, + "provider": "kafka", + "topic": "orders", + "partitionOffsets": {"0": 1042, "1": 337} +} +``` + +Clients must treat this as opaque. The structure is versioned (`v`) for future evolution. + +### 4. The `acknowledgeEvent` Mutation (Optional) + +For use cases requiring application-level acknowledgment (e.g., the client must confirm it _processed_ the event, not just received it), an `acknowledgeEvent` mutation is automatically added to the schema when `@stream` is present: + +```graphql +type Mutation { + acknowledgeEvent( + subscriptionField: String! + cursor: String! + ): AcknowledgeEventResult! +} + +type AcknowledgeEventResult { + acknowledged: Boolean! + error: String +} +``` + +When called, the router marks events ≤ cursor as acknowledged in the session buffer. This signals to the router that replay up to that point is no longer needed, allowing buffer GC. + +This is analogous to NATS JetStream's `msg.Ack()`, but exposed as a GraphQL mutation — making it explicit in the API contract and callable by any GraphQL client with zero additional tooling. + +### 5. Router Behavior on `afterCursor` + +When a `Subscribe` message arrives with `afterCursor` set, the router: + +1. Decodes the cursor. +2. Depending on provider: + - **JetStream**: creates an ephemeral consumer at `seq + 1`. + - **Kafka**: seeks the consumer to `offset + 1` per partition. + - **Session buffer (RFC-003 hybrid)**: looks up the session buffer and replays from the cursor position. +3. Replays buffered events through the full hook pipeline (including `StreamReceiveEventHandler`). +4. Once caught up, transitions to live delivery. +5. Each delivered event is wrapped in the `StreamEvent` envelope with a fresh cursor. + +### 6. `eventId` for Client-Side Deduplication + +The `eventId` field provides a stable, unique identifier per event that the client can use for deduplication without inspecting the cursor: + +- For JetStream: `{stream-name}:{seq}`. +- For Kafka: `{topic}:{partition}:{offset}`. +- For Redis/engine: UUID generated by the router at dispatch time (no replay possible, but deduplication within a session is still useful). + +Client deduplication pseudocode: +```js +const seen = new Set(); +subscription.subscribe(event => { + if (seen.has(event.eventId)) return; // duplicate, skip + seen.add(event.eventId); + // prune seen set to last N entries to bound memory + processEvent(event.data); + setCursor(event.cursor); +}); +``` + +### 7. Client Integration + +#### With Apollo Client + +```js +import { ApolloClient, InMemoryCache, gql } from '@apollo/client'; +import { GraphQLWsLink } from '@apollo/client/link/subscriptions'; +import { createClient } from 'graphql-ws'; + +const client = new ApolloClient({ link: wsLink, cache: new InMemoryCache() }); + +// Store cursor across reconnects (e.g., localStorage, sessionStorage) +let cursor = localStorage.getItem('employeeUpdates:cursor'); + +const observable = client.subscribe({ + query: gql` + subscription EmployeeUpdates($cursor: String) { + employeeUpdates(afterCursor: $cursor) { + cursor + eventId + data { id name } + } + } + `, + variables: { cursor }, +}); + +observable.subscribe(({ data }) => { + const event = data.employeeUpdates; + localStorage.setItem('employeeUpdates:cursor', event.cursor); + cursor = event.cursor; + // process event.data +}); +``` + +On page reload, `cursor` is retrieved from `localStorage` and passed to the subscription. No library changes needed — standard variables. + +#### With urql + +```js +import { pipe, subscribe } from 'wonka'; + +const [unsubscribe] = pipe( + client.subscription(EMPLOYEE_UPDATES_QUERY, { cursor }), + subscribe(result => { + const event = result.data.employeeUpdates; + cursor = event.cursor; + sessionStorage.setItem('cursor', cursor); + }) +); +``` + +#### With graphql-ws (direct) + +```js +const client = createClient({ url: 'ws://router/graphql' }); + +client.subscribe( + { query: EMPLOYEE_UPDATES_QUERY, variables: { cursor } }, + { + next: (data) => { + cursor = data.employeeUpdates.cursor; + // process data.employeeUpdates.data + }, + error: () => { + // reconnect with current cursor + reconnect(); + }, + complete: () => {} + } +); +``` + +#### With Relay + +Relay's subscription handling is standard `graphql-ws`. The `cursor` field can be stored in the Relay Store as a field on a subscription root object. A custom subscription handler updates the stored cursor after each event. + +### 8. Schema Introspection and Documentation + +Because `@stream` is a schema directive, its effects are visible via introspection: + +```graphql +{ + __schema { + subscriptionType { + fields { + name + args { name type { name } } + type { name fields { name type { name } } } + } + } + } +} +``` + +Returns: +```json +{ + "name": "employeeUpdates", + "args": [{"name": "afterCursor", "type": {"name": "String"}}], + "type": { + "name": "EmployeeStreamEvent", + "fields": [ + {"name": "cursor", "type": {"name": "String!"}}, + {"name": "eventId", "type": {"name": "String!"}}, + {"name": "data", "type": {"name": "Employee!"}} + ] + } +} +``` + +GraphQL code generators (codegen, Relay compiler) will automatically generate typed cursor handling code. + +### 9. Interaction with Cosmo Streams Hooks + +The `@stream` transformation occurs at the router layer, _after_ the `StreamReceiveEventHandler` hook pipeline. This means: +- The hook receives raw events as before. +- The envelope wrapping happens after hooks process the events. +- Cursor generation happens after hooks (the hook can filter events; only delivered events get cursors). + +If a hook filters out an event, no cursor is generated for it. The next delivered event will have a cursor pointing past the filtered event. This is correct: the cursor encodes "last delivered position", not "last seen position". + +### 10. `@stream` on Non-JetStream Subscriptions + +For Redis and engine-based subscriptions, `@stream` provides a best-effort delivery improvement: +- The router generates `eventId` (UUID) for each event. +- The cursor encodes a session-scoped sequence number (router-generated, not broker-native). +- The `afterCursor` argument is accepted but triggers session buffer replay (RFC-003 behavior) rather than broker-native replay. +- If no session buffer is configured, `afterCursor` on Redis/engine subscriptions has no effect (events from the outage period are still lost, but the client can detect the gap via `sequenceNumber` discontinuity). + +This gives Redis/engine subscriptions partial delivery improvements (deduplication + gap detection) without requiring RFC-003's session buffer. + +--- + +## Failure Modes and Edge Cases + +### Missing `afterCursor` on Reconnect + +If the client loses its cursor (e.g., clears `localStorage`), it reconnects without `afterCursor` and receives events from "now". No gap detection from the schema layer; the client starts fresh. + +### Stale Cursor (Broker Retention Expired) + +If the cursor points to a sequence beyond the broker's retention window, the router replays from the earliest available sequence and includes `"x-cosmo-gap": true` in the first event's extensions, alongside the normal envelope. No schema change needed; this is a transport-layer extension. + +### Envelope Type Name Collision + +If the subgraph schema already defines a type named `EmployeeStreamEvent`, schema composition will fail with a clear error. The directive should accept a `wrapperType` parameter to override the generated name: + +```graphql +employeeUpdates: Employee! @stream(guarantee: AT_LEAST_ONCE, wrapperType: "EmployeeEvent") +``` + +### `acknowledgeEvent` Mutation Conflicts + +If the subgraph already has an `acknowledgeEvent` mutation, the router must namespace it: `acknowledgeStreamEvent`. Configuration should allow customizing the mutation name. + +### Large Replays and Slow Clients + +Replaying thousands of events to a slow client can exhaust router memory and cause back-pressure on the broker consumer. The `maxReplay` directive parameter caps the number of events replayed per reconnect. Events beyond this cap are skipped; `x-cosmo-gap: true` is set. + +--- + +## Backward Compatibility + +- The `@stream` directive is opt-in. Existing subscriptions without the directive are unchanged. +- The directive is a Cosmo extension and is stripped from the schema exposed to clients (non-functional types like federation directives are not part of the client-facing SDL). +- The generated `afterCursor` argument and `StreamEvent` wrapper types _are_ client-visible. This is a schema addition, not a breaking change. +- Existing clients that do not pass `afterCursor` receive current behavior. + +--- + +## Tradeoffs + +| Factor | Assessment | +|---|---| +| Schema discoverability | Excellent: cursor, eventId, afterCursor all visible via introspection. | +| Client changes required | Moderate: clients must handle the envelope type and store cursors. | +| Code generation | Strong: graphql-codegen generates typed cursor handling automatically. | +| Transport independence | Works over WebSocket and SSE. | +| Provider coverage | All providers (with degraded guarantees for Redis/engine without session buffer). | +| Breaking change | No: opt-in directive; new wrapper types are additive. | +| Envelope verbosity | Every event carries cursor + eventId overhead. For high-frequency subscriptions, this adds payload size. | +| Schema complexity | `StreamEvent` wrapper types are additional schema types. For schemas with many subscriptions, the type count grows. | +| Relay / pagination analogy | Cursor pattern is already familiar from Relay pagination. Reduces conceptual overhead. | +| Mutation-based ack | Explicit and introspectable, but requires an extra round-trip. Not suitable for high-frequency acks. | + +--- + +## Comparison with Other RFCs + +| | RFC-001 (WS cursor) | RFC-002 (SSE) | RFC-003 (session buffer) | RFC-004 (schema directive) | +|---|---|---|---|---| +| Schema visibility | No | No | No | Yes | +| Client library support | Extensions field handling | EventSource API | connectionParams | Standard GraphQL variables | +| Transport | WebSocket | SSE | WebSocket | Any | +| Provider coverage | JetStream | JetStream + Kafka | All | All | +| Ack mechanism | None | None | Explicit message or implicit | Mutation (optional) | +| Code generation friendly | Partial | No | No | Yes | +| Introspectable | No | No | No | Yes | +| Duplicate detection | Cursor comparison | SSE id | x-cosmo-seq | eventId field | +| Gap detection | x-cosmo-cursor-gap extension | event: gap SSE event | sequenceNumber discontinuity | x-cosmo-gap extension + sequenceNumber | + +--- + +## Open Questions + +1. Should the `@stream` directive be a Cosmo-proprietary directive (similar to `@edfs__natsSubscribe`) or should it be proposed to the broader GraphQL community as a standard extension? +2. Should cursor storage be the client's responsibility entirely (as proposed here), or should the router offer an optional server-side cursor store (session per client identity) as a complement? +3. Should the generated `StreamEvent` types be hidden in the introspection schema (to reduce noise) and only the `data` field be visible, with cursor handled as a special extension? This trades discoverability for cleanliness. +4. Is the `acknowledgeEvent` mutation ergonomic enough, or should acks be handled via a dedicated connection-level framing (mixing RFC-003's approach)? +5. For high-frequency subscriptions (> 100 events/second), the cursor + eventId overhead per event may be significant. Should there be a `@stream(compressed: true)` mode that batches events into a single `Next` message with one cursor for the batch? +6. How should the `sequenceNumber` in the envelope be defined across sessions — should it be global (per stream) or local (per connection)?