diff --git a/README_INCREMENTAL.md b/README_INCREMENTAL.md
new file mode 100644
index 0000000..e29b711
--- /dev/null
+++ b/README_INCREMENTAL.md
@@ -0,0 +1,684 @@
+# Absinthe Relay Incremental Delivery
+
+Relay connection support for GraphQL `@defer` and `@stream` directives.
+
+## Overview
+
+This package extends `absinthe_relay` to support incremental delivery with Relay-style connections. Stream edges incrementally while maintaining cursor consistency and proper connection structure throughout the streaming process.
+
+## Features
+
+- ✅ **Relay Specification**: Full compliance with Relay Cursor Connection spec
+- ✅ **Cursor Consistency**: Maintains proper cursor ordering during streaming
+- ✅ **Connection Structure**: Preserves `pageInfo` and connection metadata
+- ✅ **Bidirectional Pagination**: Supports forward and backward streaming
+- ✅ **Error Resilience**: Graceful handling of partial failures
+
+## Installation
+
+This functionality is included when using both `absinthe_relay` and incremental delivery:
+
+```elixir
+def deps do
+ [
+ {:absinthe, "~> 1.8"},
+ {:absinthe_relay, "~> 1.5"}
+ ]
+end
+```
+
+## Basic Usage
+
+### Schema Definition
+
+```elixir
+defmodule MyApp.Schema do
+ use Absinthe.Schema
+ use Absinthe.Relay.Schema, :modern
+
+ import_types Absinthe.Type.BuiltIns
+
+ connection node_type: :post
+ connection node_type: :user
+
+ query do
+ connection field :posts, node_type: :post do
+ arg :category, :string
+
+ resolve fn args, _ ->
+ # Your existing connection resolver
+ MyApp.Resolvers.list_posts(args)
+ end
+ end
+
+ field :user, :user do
+ arg :id, non_null(:id)
+ resolve &MyApp.Resolvers.get_user/2
+ end
+ end
+
+ object :user do
+ field :id, non_null(:id)
+ field :name, non_null(:string)
+
+ connection field :posts, node_type: :post do
+ resolve fn user, args, _ ->
+ MyApp.Resolvers.user_posts(user, args)
+ end
+ end
+ end
+
+ node object :post do
+ field :id, non_null(:id)
+ field :title, non_null(:string)
+ field :content, :string
+ field :published_at, :datetime
+ end
+end
+```
+
+### Streaming Connections
+
+#### Basic Streaming
+
+```graphql
+query GetPosts($first: Int!, $after: String) {
+ posts(first: $first, after: $after) @stream(initialCount: 2, label: "posts") {
+ pageInfo {
+ hasNextPage
+ hasPreviousPage
+ startCursor
+ endCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ publishedAt
+ }
+ }
+ }
+}
+```
+
+#### Streaming with Deferred Node Data
+
+```graphql
+query GetPostsWithDetails($first: Int!) {
+ posts(first: $first) @stream(initialCount: 3, label: "posts") {
+ pageInfo {
+ hasNextPage
+ endCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ # Defer expensive content loading
+ ... @defer(label: "content") {
+ content
+ author {
+ name
+ avatar
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+#### Nested Connection Streaming
+
+```graphql
+query GetUsersWithPosts($first: Int!) {
+ users(first: $first) @stream(initialCount: 2, label: "users") {
+ edges {
+ node {
+ id
+ name
+ # Stream user's posts independently
+ posts(first: 5) @stream(initialCount: 2, label: "userPosts") {
+ edges {
+ node {
+ id
+ title
+ }
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+## Response Format
+
+### Initial Response
+
+```json
+{
+ "data": {
+ "posts": {
+ "pageInfo": {
+ "hasNextPage": true,
+ "hasPreviousPage": false,
+ "startCursor": "Y3Vyc29yMQ==",
+ "endCursor": "Y3Vyc29yMg=="
+ },
+ "edges": [
+ {
+ "cursor": "Y3Vyc29yMQ==",
+ "node": {"id": "1", "title": "First Post"}
+ },
+ {
+ "cursor": "Y3Vyc29yMg==",
+ "node": {"id": "2", "title": "Second Post"}
+ }
+ ]
+ }
+ },
+ "pending": [
+ {"label": "posts", "path": ["posts"]}
+ ]
+}
+```
+
+### Incremental Response
+
+```json
+{
+ "incremental": [{
+ "label": "posts",
+ "path": ["posts"],
+ "items": [
+ {
+ "cursor": "Y3Vyc29yMw==",
+ "node": {"id": "3", "title": "Third Post"}
+ },
+ {
+ "cursor": "Y3Vyc29yNA==",
+ "node": {"id": "4", "title": "Fourth Post"}
+ }
+ ]
+ }]
+}
+```
+
+### Updated PageInfo
+
+```json
+{
+ "incremental": [{
+ "label": "posts",
+ "path": ["posts", "pageInfo"],
+ "data": {
+ "endCursor": "Y3Vyc29yNA==",
+ "hasNextPage": true
+ }
+ }]
+}
+```
+
+## Advanced Features
+
+### Cursor Management
+
+The system automatically:
+- Maintains cursor ordering during streaming
+- Updates `pageInfo` as new edges arrive
+- Ensures cursor consistency across batches
+
+```elixir
+# Custom cursor generation
+defmodule MyApp.Resolvers do
+ def list_posts(args) do
+ # Ensure stable cursor generation for streaming
+ posts =
+ Post
+ |> order_by([p], [desc: p.inserted_at, asc: p.id]) # Stable ordering
+ |> Connection.from_query(&Repo.all/1, args)
+
+ {:ok, posts}
+ end
+end
+```
+
+### Pagination Direction Support
+
+#### Forward Pagination with Streaming
+
+```graphql
+query GetMorePosts($first: Int!, $after: String) {
+ posts(first: $first, after: $after) @stream(initialCount: 5) {
+ pageInfo {
+ hasNextPage
+ endCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ }
+ }
+ }
+}
+```
+
+#### Backward Pagination with Streaming
+
+```graphql
+query GetPreviousPosts($last: Int!, $before: String) {
+ posts(last: $last, before: $before) @stream(initialCount: 5) {
+ pageInfo {
+ hasPreviousPage
+ startCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ }
+ }
+ }
+}
+```
+
+### Conditional Streaming
+
+```graphql
+query GetPosts($first: Int!, $shouldStream: Boolean!) {
+ posts(first: $first) @stream(if: $shouldStream, initialCount: 3) {
+ pageInfo {
+ hasNextPage
+ endCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ publishedAt
+ }
+ }
+ }
+}
+```
+
+## Client Integration
+
+### JavaScript/React Example
+
+```javascript
+import { useLazyQuery } from '@apollo/client';
+
+function PostList() {
+ const [loadPosts, { data, loading }] = useLazyQuery(GET_POSTS_QUERY, {
+ fetchPolicy: 'cache-and-network',
+ notifyOnNetworkStatusChange: true
+ });
+
+ const [posts, setPosts] = useState([]);
+ const [pageInfo, setPageInfo] = useState({});
+
+ useEffect(() => {
+ if (data?.posts) {
+ // Initial data
+ if (data.posts.edges) {
+ setPosts(data.posts.edges);
+ setPageInfo(data.posts.pageInfo);
+ }
+
+ // Incremental data
+ if (data.incremental) {
+ data.incremental.forEach(increment => {
+ if (increment.label === 'posts' && increment.items) {
+ setPosts(prev => [...prev, ...increment.items]);
+ }
+ if (increment.path?.includes('pageInfo')) {
+ setPageInfo(prev => ({ ...prev, ...increment.data }));
+ }
+ });
+ }
+ }
+ }, [data]);
+
+ const loadMore = () => {
+ if (pageInfo.hasNextPage) {
+ loadPosts({
+ variables: {
+ first: 10,
+ after: pageInfo.endCursor,
+ shouldStream: true
+ }
+ });
+ }
+ };
+
+ return (
+
+ {posts.map(edge => (
+
+ ))}
+
+ {pageInfo.hasNextPage && (
+
+ )}
+
+ );
+}
+```
+
+### Relay Modern Example
+
+```javascript
+import { graphql, usePaginationFragment } from 'react-relay';
+
+const PostListPaginationFragment = graphql`
+ fragment PostList_posts on Query
+ @refetchable(queryName: "PostListPaginationQuery")
+ @argumentDefinitions(
+ first: { type: "Int", defaultValue: 10 }
+ after: { type: "String" }
+ shouldStream: { type: "Boolean", defaultValue: true }
+ ) {
+ posts(first: $first, after: $after)
+ @stream(if: $shouldStream, initialCount: 3, label: "posts")
+ @connection(key: "PostList_posts") {
+ pageInfo {
+ hasNextPage
+ endCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ publishedAt
+ }
+ }
+ }
+ }
+`;
+
+function PostList({ query }) {
+ const {
+ data,
+ loadNext,
+ hasNext,
+ isLoadingNext
+ } = usePaginationFragment(PostListPaginationFragment, query);
+
+ return (
+
+ {data.posts.edges.map(edge => (
+
+ ))}
+
+ {hasNext && (
+
+ )}
+
+ );
+}
+```
+
+## Performance Optimization
+
+### Batch Size Configuration
+
+```elixir
+# Configure optimal batch sizes per connection type
+connection field :posts, node_type: :post do
+ meta incremental: [
+ stream_batch_size: 10, # Good for small post objects
+ defer_fragments: true # Allow fragment deferral
+ ]
+
+ resolve &Resolvers.list_posts/2
+end
+
+connection field :large_items, node_type: :large_item do
+ meta incremental: [
+ stream_batch_size: 3, # Smaller batches for large objects
+ defer_fragments: true
+ ]
+
+ resolve &Resolvers.list_large_items/2
+end
+```
+
+### Dataloader Optimization
+
+```elixir
+# Maintain efficient batching across streaming
+defmodule MyApp.Resolvers do
+ def list_posts(args) do
+ # Dataloader continues to batch efficiently
+ posts = Connection.from_query(Post, &Repo.all/1, args)
+ {:ok, posts}
+ end
+
+ def post_author(post, _, %{context: %{loader: loader}}) do
+ # Batched loading works across streaming boundaries
+ loader
+ |> Dataloader.load(User, :user, post.author_id)
+ |> on_load(fn loader ->
+ {:ok, Dataloader.get(loader, User, :user, post.author_id)}
+ end)
+ end
+end
+```
+
+### Memory Management
+
+```elixir
+# Configure connection limits
+config :absinthe_relay, :incremental,
+ max_connection_size: 1000,
+ stream_buffer_size: 100,
+ cleanup_interval: 60_000
+```
+
+## Error Handling
+
+### Partial Failure Recovery
+
+```json
+{
+ "incremental": [{
+ "label": "posts",
+ "path": ["posts"],
+ "items": [
+ {"cursor": "Y3Vyc29yMw==", "node": {"id": "3", "title": "Post 3"}},
+ null // Failed to load
+ ],
+ "errors": [{
+ "message": "Post not found",
+ "path": ["posts", "edges", 1, "node"]
+ }]
+ }]
+}
+```
+
+### Connection State Recovery
+
+The system ensures:
+- Cursor consistency despite errors
+- Proper `pageInfo` updates
+- Graceful degradation on failures
+
+## Testing
+
+### Unit Tests
+
+```elixir
+defmodule MyApp.Schema.IncrementalConnectionTest do
+ use ExUnit.Case, async: true
+ use Absinthe.Test, schema: MyApp.Schema
+
+ test "streams connection edges incrementally" do
+ query = """
+ query GetPosts($first: Int!) {
+ posts(first: $first) @stream(initialCount: 2, label: "posts") {
+ pageInfo {
+ hasNextPage
+ endCursor
+ }
+ edges {
+ cursor
+ node {
+ id
+ title
+ }
+ }
+ }
+ }
+ """
+
+ result = run_streaming_query(query, %{"first" => 10})
+
+ # Initial response has 2 edges
+ assert length(result.initial.data["posts"]["edges"]) == 2
+ assert result.initial.data["posts"]["pageInfo"]["hasNextPage"] == true
+
+ # Incremental responses have remaining edges
+ streamed_items = collect_streamed_items(result.incremental, "posts")
+ assert length(streamed_items) == 8
+
+ # Cursors are properly ordered
+ all_cursors = extract_cursors(result)
+ assert cursors_ordered?(all_cursors)
+ end
+
+ test "handles pagination with streaming" do
+ # Test forward/backward pagination
+ # Test cursor consistency
+ # Test pageInfo updates
+ end
+end
+```
+
+### Integration Tests
+
+```elixir
+defmodule MyApp.IncrementalConnectionIntegrationTest do
+ use ExUnit.Case, async: false
+ use Phoenix.ChannelTest
+
+ test "WebSocket connection streaming" do
+ # Test complete WebSocket flow
+ # Test connection lifecycle
+ # Test error recovery
+ end
+
+ test "SSE connection streaming" do
+ # Test Server-Sent Events
+ # Test client reconnection
+ # Test partial failures
+ end
+end
+```
+
+## Monitoring
+
+### Connection Metrics
+
+```elixir
+:telemetry.attach_many(
+ "relay-incremental-metrics",
+ [
+ [:absinthe_relay, :incremental, :connection, :start],
+ [:absinthe_relay, :incremental, :connection, :stream],
+ [:absinthe_relay, :incremental, :connection, :complete]
+ ],
+ &MyApp.Telemetry.handle_relay_event/4,
+ %{}
+)
+
+def handle_relay_event([:absinthe_relay, :incremental, :connection, :stream], measurements, metadata, _config) do
+ # Track streaming metrics
+ :telemetry.execute(
+ [:myapp, :relay, :connection_stream],
+ %{
+ batch_size: measurements.batch_size,
+ total_edges: measurements.total_edges,
+ cursor_position: measurements.cursor_position
+ },
+ metadata
+ )
+end
+```
+
+### Performance Tracking
+
+Key metrics to monitor:
+- Connection streaming latency
+- Cursor consistency validation
+- Edge batch sizes and timing
+- Memory usage per connection
+- Error rates per connection type
+
+## Troubleshooting
+
+### Common Issues
+
+1. **Cursor ordering problems**
+ - Ensure stable sorting in resolvers
+ - Check cursor generation consistency
+ - Verify database ordering guarantees
+
+2. **PageInfo inconsistencies**
+ - Monitor pageInfo updates during streaming
+ - Validate hasNextPage/hasPreviousPage logic
+ - Check endCursor/startCursor updates
+
+3. **Performance degradation**
+ - Profile batch size effectiveness
+ - Monitor dataloader batching efficiency
+ - Check memory usage patterns
+
+### Debug Utilities
+
+```elixir
+# Debug cursor consistency
+defmodule MyApp.Debug.CursorValidator do
+ def validate_stream_cursors(streaming_result) do
+ all_cursors = extract_all_cursors(streaming_result)
+
+ case validate_ordering(all_cursors) do
+ :ok -> :ok
+ {:error, reason} ->
+ Logger.error("Cursor ordering violation: #{reason}")
+ {:error, reason}
+ end
+ end
+end
+```
+
+## Examples
+
+See [examples/](examples/) for:
+- Complete Relay Modern integration
+- Real-time comment streaming
+- Infinite scroll implementation
+- Performance benchmarks
+
+## Contributing
+
+Priority areas for contribution:
+- Relay Modern compatibility testing
+- Performance optimization
+- Cursor consistency edge cases
+- Documentation improvements
\ No newline at end of file
diff --git a/lib/absinthe/relay/incremental/connection.ex b/lib/absinthe/relay/incremental/connection.ex
new file mode 100644
index 0000000..6646e1f
--- /dev/null
+++ b/lib/absinthe/relay/incremental/connection.ex
@@ -0,0 +1,349 @@
+defmodule Absinthe.Relay.Incremental.Connection do
+ @moduledoc """
+ Streaming support for Relay connections.
+
+ This module enables @stream directive to work correctly with Relay's
+ connection pattern, ensuring proper cursor handling and pagination
+ with incremental delivery.
+ """
+
+ alias Absinthe.Relay.Connection
+
+ @type stream_config :: %{
+ initial_count: non_neg_integer(),
+ label: String.t() | nil,
+ path: list()
+ }
+
+ @type streaming_connection :: %{
+ initial: Connection.t(),
+ stream_plan: list(stream_batch()),
+ total_count: non_neg_integer()
+ }
+
+ @type stream_batch :: %{
+ edges: list(Connection.Edge.t()),
+ path: list(),
+ label: String.t() | nil,
+ start_cursor: String.t(),
+ end_cursor: String.t()
+ }
+
+ @doc """
+ Convert a Relay connection to support streaming.
+
+ This splits the connection into an initial response and a streaming plan
+ for the remaining edges.
+ """
+ @spec stream_connection(Connection.t(), stream_config()) ::
+ {:ok, streaming_connection()} | {:error, term()}
+ def stream_connection(connection, stream_config) do
+ initial_count = Map.get(stream_config, :initial_count, 0)
+
+ # Split edges into initial and remaining
+ {initial_edges, remaining_edges} =
+ split_edges(connection.edges, initial_count)
+
+ # Build initial connection with updated page info
+ initial_connection = %{connection |
+ edges: initial_edges,
+ page_info: update_page_info_for_streaming(
+ connection.page_info,
+ initial_edges,
+ remaining_edges,
+ connection
+ )
+ }
+
+ # Create streaming plan for remaining edges
+ stream_plan =
+ if Enum.empty?(remaining_edges) do
+ []
+ else
+ plan_edge_streaming(remaining_edges, stream_config)
+ end
+
+ {:ok, %{
+ initial: initial_connection,
+ stream_plan: stream_plan,
+ total_count: length(connection.edges)
+ }}
+ end
+
+ @doc """
+ Process a streamed batch of edges.
+
+ Returns the edges formatted for incremental delivery with proper
+ cursor continuity.
+ """
+ @spec process_stream_batch(stream_batch()) :: map()
+ def process_stream_batch(batch) do
+ %{
+ edges: Enum.map(batch.edges, &format_edge/1),
+ path: batch.path,
+ label: batch.label,
+ pageInfo: %{
+ startCursor: batch.start_cursor,
+ endCursor: batch.end_cursor
+ }
+ }
+ end
+
+ @doc """
+ Validate cursor continuity across streamed batches.
+
+ Ensures that cursors maintain proper ordering when edges are
+ delivered incrementally.
+ """
+ @spec validate_cursor_continuity(list(Connection.Edge.t()), list(Connection.Edge.t())) ::
+ :ok | {:error, term()}
+ def validate_cursor_continuity([], _), do: :ok
+ def validate_cursor_continuity(_, []), do: :ok
+
+ def validate_cursor_continuity(previous_edges, new_edges) do
+ last_cursor = get_last_cursor(previous_edges)
+ first_cursor = get_first_cursor(new_edges)
+
+ if follows_cursor?(first_cursor, last_cursor) do
+ :ok
+ else
+ {:error, "Cursor discontinuity detected in streamed connection"}
+ end
+ end
+
+ @doc """
+ Create a connection that supports streaming from a list of items.
+
+ This is a streaming-aware version of Relay.Connection.from_list.
+ """
+ @spec from_list(list(), map(), Keyword.t()) :: {:ok, Connection.t()} | {:error, term()}
+ def from_list(items, args, opts \\ []) do
+ # Check if streaming is requested
+ case Map.get(args, :stream) do
+ nil ->
+ # Standard connection without streaming
+ Connection.from_list(items, args, opts)
+
+ stream_args ->
+ # Create streaming connection
+ build_streaming_connection(items, args, stream_args, opts)
+ end
+ end
+
+ @doc """
+ Apply @stream directive to a connection field.
+
+ This is used by the schema to mark connection fields for streaming.
+ """
+ @spec stream_field(atom(), Keyword.t()) :: Absinthe.Schema.Notation.field_result()
+ defmacro stream_field(field_name, opts \\ []) do
+ quote do
+ field unquote(field_name), :connection do
+ # Add streaming metadata
+ meta :streaming_enabled, true
+
+ # Apply options
+ unquote(Keyword.get(opts, :do))
+
+ # Wrap resolver with streaming support
+ middleware Absinthe.Relay.Incremental.Connection.StreamingMiddleware
+ end
+ end
+ end
+
+ # Private functions
+
+ defp split_edges(edges, initial_count) when initial_count >= 0 do
+ {Enum.take(edges, initial_count), Enum.drop(edges, initial_count)}
+ end
+
+ defp update_page_info_for_streaming(page_info, initial_edges, remaining_edges, connection) do
+ has_more = not Enum.empty?(remaining_edges)
+
+ %{page_info |
+ # Indicate more edges are coming via streaming
+ has_next_page: page_info.has_next_page or has_more,
+ # Update end cursor to last initial edge if we have any
+ end_cursor: get_last_cursor(initial_edges) || page_info.end_cursor,
+ # Keep start cursor from first edge
+ start_cursor: get_first_cursor(initial_edges) || page_info.start_cursor
+ }
+ end
+
+ defp plan_edge_streaming(edges, config) do
+ batch_size = calculate_stream_batch_size(config)
+
+ edges
+ |> Enum.chunk_every(batch_size)
+ |> Enum.with_index()
+ |> Enum.map(fn {edge_batch, index} ->
+ %{
+ edges: edge_batch,
+ path: config.path ++ ["edges"],
+ label: build_batch_label(config.label, index),
+ start_cursor: get_first_cursor(edge_batch),
+ end_cursor: get_last_cursor(edge_batch)
+ }
+ end)
+ end
+
+ defp calculate_stream_batch_size(config) do
+ # Determine optimal batch size based on configuration
+ Map.get(config, :batch_size, 10)
+ end
+
+ defp format_edge(edge) do
+ %{
+ node: edge.node,
+ cursor: edge.cursor
+ }
+ end
+
+ defp get_first_cursor([]), do: nil
+ defp get_first_cursor([edge | _]), do: edge.cursor
+
+ defp get_last_cursor([]), do: nil
+ defp get_last_cursor(edges), do: List.last(edges).cursor
+
+ defp follows_cursor?(nil, _), do: true
+ defp follows_cursor?(_, nil), do: true
+ defp follows_cursor?(cursor1, cursor2) do
+ # Decode and compare cursors
+ with {:ok, pos1} <- decode_cursor(cursor1),
+ {:ok, pos2} <- decode_cursor(cursor2) do
+ pos1 > pos2
+ else
+ _ -> false
+ end
+ end
+
+ defp decode_cursor(cursor) do
+ case Base.decode64(cursor) do
+ {:ok, decoded} ->
+ # Parse the position from the cursor
+ case String.split(decoded, ":") do
+ ["cursor", position] -> {:ok, String.to_integer(position)}
+ _ -> {:error, :invalid_cursor}
+ end
+ error -> error
+ end
+ end
+
+ defp build_batch_label(nil, index), do: "batch_#{index}"
+ defp build_batch_label(label, index), do: "#{label}_batch_#{index}"
+
+ defp build_streaming_connection(items, args, stream_args, opts) do
+ # First build standard connection
+ case Connection.from_list(items, Map.delete(args, :stream), opts) do
+ {:ok, connection} ->
+ # Then apply streaming
+ stream_config = %{
+ initial_count: Map.get(stream_args, :initial_count, 0),
+ label: Map.get(stream_args, :label),
+ path: Keyword.get(opts, :path, [])
+ }
+
+ stream_connection(connection, stream_config)
+
+ error ->
+ error
+ end
+ end
+
+ @doc """
+ Generate a streaming cursor for an item.
+
+ Ensures cursor stability across incremental deliveries.
+ """
+ @spec generate_streaming_cursor(any(), non_neg_integer(), map()) :: String.t()
+ def generate_streaming_cursor(item, index, context) do
+ # Generate a stable cursor that includes:
+ # - Query ID for uniqueness
+ # - Index for ordering
+ # - Item ID if available
+
+ query_id = Map.get(context, :query_id, "default")
+ item_id = get_item_id(item)
+
+ cursor_data = "cursor:#{query_id}:#{index}:#{item_id}"
+ Base.encode64(cursor_data)
+ end
+
+ defp get_item_id(item) do
+ case item do
+ %{id: id} -> id
+ _ -> :erlang.phash2(item)
+ end
+ end
+end
+
+defmodule Absinthe.Relay.Incremental.Connection.StreamingMiddleware do
+ @moduledoc """
+ Middleware that adds streaming support to Relay connections.
+ """
+
+ @behaviour Absinthe.Middleware
+
+ alias Absinthe.Resolution
+ alias Absinthe.Relay.Incremental.Connection
+
+ def call(resolution, _opts) do
+ # Check if streaming is enabled for this field
+ if streaming_enabled?(resolution) do
+ wrap_with_streaming(resolution)
+ else
+ resolution
+ end
+ end
+
+ defp streaming_enabled?(resolution) do
+ # Check field metadata for streaming flag
+ get_in(resolution.definition, [:meta, :streaming_enabled]) == true
+ end
+
+ defp wrap_with_streaming(resolution) do
+ # Wrap the resolver to handle streaming
+ Resolution.put_result(
+ resolution,
+ resolve_with_streaming(resolution)
+ )
+ end
+
+ defp resolve_with_streaming(resolution) do
+ case resolution.value do
+ {:ok, %{edges: _} = connection} ->
+ # Check if @stream directive is present
+ case get_stream_directive(resolution) do
+ nil ->
+ {:ok, connection}
+
+ stream_args ->
+ # Apply streaming to the connection
+ stream_config = %{
+ initial_count: Map.get(stream_args, :initialCount, 0),
+ label: Map.get(stream_args, :label),
+ path: Resolution.path(resolution)
+ }
+
+ Connection.stream_connection(connection, stream_config)
+ end
+
+ other ->
+ other
+ end
+ end
+
+ defp get_stream_directive(resolution) do
+ # Extract @stream directive arguments from the field
+ resolution.definition
+ |> Map.get(:directives, [])
+ |> Enum.find(fn
+ %{name: "stream"} -> true
+ _ -> false
+ end)
+ |> case do
+ %{arguments: args} -> args
+ _ -> nil
+ end
+ end
+end
\ No newline at end of file
diff --git a/mix.exs b/mix.exs
index 693ddaa..ceb3cff 100644
--- a/mix.exs
+++ b/mix.exs
@@ -54,7 +54,7 @@ defmodule AbsintheRelay.Mixfile do
defp deps do
[
- {:absinthe, ">= 1.7.10"},
+ {:absinthe, git: "https://github.com/gigsmart/absinthe.git", branch: "gigmart/defer-stream-incremental"},
{:ecto, "~> 2.0 or ~> 3.0", optional: true},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:mix_audit, "~> 2.1", only: [:dev, :test], runtime: false}