Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions lib/absinthe/blueprint/schema/enum_type_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ defmodule Absinthe.Blueprint.Schema.EnumTypeDefinition do
values: values_by(type_def, :identifier),
values_by_internal_value: values_by(type_def, :value),
values_by_name: values_by(type_def, :name),
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
definition: type_def.module,
description: type_def.description
}
Expand All @@ -54,7 +55,8 @@ defmodule Absinthe.Blueprint.Schema.EnumTypeDefinition do
__private__: value_def.__private__,
description: value_def.description,
deprecation: value_def.deprecation,
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(value_def.directives)
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(value_def.directives)
}

{Map.fetch!(value_def, key), value}
Expand Down
6 changes: 4 additions & 2 deletions lib/absinthe/blueprint/schema/input_object_type_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ defmodule Absinthe.Blueprint.Schema.InputObjectTypeDefinition do
name: type_def.name,
fields: build_fields(type_def, schema),
description: type_def.description,
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
definition: type_def.module
}
end
Expand All @@ -50,7 +51,8 @@ defmodule Absinthe.Blueprint.Schema.InputObjectTypeDefinition do
description: field_def.description,
name: field_def.name,
type: Blueprint.TypeReference.to_type(field_def.type, schema),
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(field_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(field_def.directives),
definition: type_def.module,
__reference__: field_def.__reference__,
__private__: field_def.__private__,
Expand Down
3 changes: 2 additions & 1 deletion lib/absinthe/blueprint/schema/interface_type_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ defmodule Absinthe.Blueprint.Schema.InterfaceTypeDefinition do
fields: Blueprint.Schema.ObjectTypeDefinition.build_fields(type_def, schema),
identifier: type_def.identifier,
resolve_type: type_def.resolve_type,
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
definition: type_def.module,
interfaces: type_def.interfaces
}
Expand Down
47 changes: 32 additions & 15 deletions lib/absinthe/blueprint/schema/object_type_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,34 +103,51 @@ defmodule Absinthe.Blueprint.Schema.ObjectTypeDefinition do
Enum.map(directives, fn directive ->
%{
name: directive.name,
args: Enum.map(directive.arguments, fn arg ->
%{
name: arg.name,
value: serialize_argument_value(arg.input_value)
}
end)
args:
Enum.map(directive.arguments, fn arg ->
%{
name: arg.name,
value: serialize_argument_value(arg.input_value)
}
end)
}
end)
end

def build_applied_directives(_), do: []

defp serialize_argument_value(%Absinthe.Blueprint.Input.String{value: value}), do: inspect(value)
defp serialize_argument_value(%Absinthe.Blueprint.Input.Integer{value: value}), do: to_string(value)
defp serialize_argument_value(%Absinthe.Blueprint.Input.Float{value: value}), do: to_string(value)
defp serialize_argument_value(%Absinthe.Blueprint.Input.Boolean{value: value}), do: to_string(value)
defp serialize_argument_value(%Absinthe.Blueprint.Input.String{value: value}),
do: inspect(value)

defp serialize_argument_value(%Absinthe.Blueprint.Input.Integer{value: value}),
do: to_string(value)

defp serialize_argument_value(%Absinthe.Blueprint.Input.Float{value: value}),
do: to_string(value)

defp serialize_argument_value(%Absinthe.Blueprint.Input.Boolean{value: value}),
do: to_string(value)

defp serialize_argument_value(%Absinthe.Blueprint.Input.Null{}), do: "null"
defp serialize_argument_value(%Absinthe.Blueprint.Input.Enum{value: value}), do: value

defp serialize_argument_value(%Absinthe.Blueprint.Input.List{items: items}) do
"[" <> Enum.map_join(items, ", ", &serialize_argument_value/1) <> "]"
end

defp serialize_argument_value(%Absinthe.Blueprint.Input.Object{fields: fields}) do
"{" <> Enum.map_join(fields, ", ", fn field ->
"#{field.name}: #{serialize_argument_value(field.input_value)}"
end) <> "}"
"{" <>
Enum.map_join(fields, ", ", fn field ->
"#{field.name}: #{serialize_argument_value(field.input_value)}"
end) <> "}"
end
defp serialize_argument_value(%Absinthe.Blueprint.Input.RawValue{content: content}), do: serialize_argument_value(content)
defp serialize_argument_value(%Absinthe.Blueprint.Input.Value{raw: raw}), do: serialize_argument_value(raw)

defp serialize_argument_value(%Absinthe.Blueprint.Input.RawValue{content: content}),
do: serialize_argument_value(content)

defp serialize_argument_value(%Absinthe.Blueprint.Input.Value{raw: raw}),
do: serialize_argument_value(raw)

defp serialize_argument_value(value), do: inspect(value)

defimpl Inspect do
Expand Down
5 changes: 4 additions & 1 deletion lib/absinthe/blueprint/schema/scalar_type_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ defmodule Absinthe.Blueprint.Schema.ScalarTypeDefinition do
identifier: type_def.identifier,
name: type_def.name,
description: type_def.description,
applied_directives: Absinthe.Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
applied_directives:
Absinthe.Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(
type_def.directives
),
definition: type_def.module,
serialize: type_def.serialize,
parse: type_def.parse,
Expand Down
9 changes: 6 additions & 3 deletions lib/absinthe/blueprint/schema/union_type_definition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ defmodule Absinthe.Blueprint.Schema.UnionTypeDefinition do
identifier: type_def.identifier,
types: type_def.types |> atomize_types(schema),
fields: build_fields(type_def, schema),
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(type_def.directives),
definition: type_def.module,
resolve_type: type_def.resolve_type
}
Expand All @@ -64,7 +65,8 @@ defmodule Absinthe.Blueprint.Schema.UnionTypeDefinition do
name: field_def.name,
type: Blueprint.TypeReference.to_type(field_def.type, schema),
args: build_args(field_def, schema),
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(field_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(field_def.directives),
definition: field_def.module,
__reference__: field_def.__reference__,
__private__: field_def.__private__
Expand All @@ -83,7 +85,8 @@ defmodule Absinthe.Blueprint.Schema.UnionTypeDefinition do
type: Blueprint.TypeReference.to_type(arg_def.type, schema),
default_value: arg_def.default_value,
deprecation: arg_def.deprecation,
applied_directives: Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(arg_def.directives),
applied_directives:
Blueprint.Schema.ObjectTypeDefinition.build_applied_directives(arg_def.directives),
__reference__: arg_def.__reference__,
__private__: arg_def.__private__
}
Expand Down
8 changes: 7 additions & 1 deletion lib/absinthe/incremental/complexity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,13 @@ defmodule Absinthe.Incremental.Complexity do
end
end

defp analyze_node(%Blueprint.Document.Fragment.Spread{} = node, schema, config, analysis, depth) do
defp analyze_node(
%Blueprint.Document.Fragment.Spread{} = node,
_schema,
config,
analysis,
depth
) do
{analysis, _in_defer} = check_defer_directive(node, config, analysis, depth)
# Would need to look up the fragment definition for full analysis
analysis
Expand Down
6 changes: 4 additions & 2 deletions lib/absinthe/incremental/dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ defmodule Absinthe.Incremental.Dataloader do

This allows existing Dataloader resolvers to work with incremental delivery.
"""
@spec streaming_dataloader(atom(), any()) :: Resolution.resolver()
@spec streaming_dataloader(atom(), any()) ::
(Resolution.source(), Resolution.arguments(), Resolution.t() ->
{:ok, any()} | {:error, any()} | {:middleware, module(), any()})
def streaming_dataloader(source, batch_key \\ nil) do
fn parent, args, %{context: context} = resolution ->
# Check if we're in a streaming context
Expand Down Expand Up @@ -305,7 +307,7 @@ defmodule Absinthe.Incremental.Dataloader do
}

# Add to the batch queue in the resolution context
resolution =
_resolution =
update_in(
resolution.context[:__dataloader_batch_queue__],
&[batch_data | &1 || []]
Expand Down
12 changes: 2 additions & 10 deletions lib/absinthe/incremental/error_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule Absinthe.Incremental.ErrorHandler do
streaming operations, ensuring robust behavior even when things go wrong.
"""

alias Absinthe.Incremental.Response
require Logger

@type error_type ::
Expand Down Expand Up @@ -315,18 +314,11 @@ defmodule Absinthe.Incremental.ErrorHandler do
}
end

defp format_exception(exception, stacktrace \\ nil) do
formatted_stacktrace =
if stacktrace do
Exception.format_stacktrace(stacktrace)
else
"stacktrace not available"
end

defp format_exception(exception, stacktrace) do
%{
message: Exception.message(exception),
type: exception.__struct__,
stacktrace: formatted_stacktrace
stacktrace: Exception.format_stacktrace(stacktrace)
}
end

Expand Down
20 changes: 10 additions & 10 deletions lib/absinthe/incremental/resource_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,16 @@ defmodule Absinthe.Incremental.ResourceManager do
update_in(state.stream_stats.total_count, &(&1 + 1))
end

defp update_stats(state, :stream_timeout) do
state
|> update_in([:stream_stats, :timeout_count], &(&1 + 1))
|> update_in([:stream_stats, :failed_count], &(&1 + 1))
end

defp update_stats(state, :stream_crashed) do
update_in(state.stream_stats.failed_count, &(&1 + 1))
end

defp update_stats(state, :stream_released, duration) do
state
|> update_in([:stream_stats, :completed_count], &(&1 + 1))
Expand All @@ -269,16 +279,6 @@ defmodule Absinthe.Incremental.ResourceManager do
end)
end

defp update_stats(state, :stream_timeout) do
state
|> update_in([:stream_stats, :timeout_count], &(&1 + 1))
|> update_in([:stream_stats, :failed_count], &(&1 + 1))
end

defp update_stats(state, :stream_crashed) do
update_in(state.stream_stats.failed_count, &(&1 + 1))
end

defp schedule_stream_timeout(operation_id, timeout_ms) do
Process.send_after(self(), {:stream_timeout, operation_id}, timeout_ms)
end
Expand Down
43 changes: 22 additions & 21 deletions lib/absinthe/incremental/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,35 @@ defmodule Absinthe.Incremental.Response do
alias Absinthe.Blueprint

@type initial_response :: %{
data: map(),
pending: list(pending_item()),
hasNext: boolean(),
errors: list(map()) | nil
required(:data) => map(),
required(:pending) => [pending_item()],
required(:hasNext) => boolean(),
optional(:errors) => [map()]
}

@type incremental_response :: %{
incremental: list(incremental_item()),
hasNext: boolean(),
completed: list(completed_item()) | nil
required(:hasNext) => boolean(),
optional(:incremental) => [incremental_item()],
optional(:completed) => [completed_item()]
}

@type pending_item :: %{
id: String.t(),
path: list(String.t() | integer()),
label: String.t() | nil
required(:id) => String.t(),
required(:path) => [String.t() | integer()],
optional(:label) => String.t()
}

@type incremental_item :: %{
data: any(),
path: list(String.t() | integer()),
label: String.t() | nil,
errors: list(map()) | nil
optional(:data) => any(),
optional(:items) => [any()],
required(:path) => [String.t() | integer()],
optional(:label) => String.t(),
optional(:errors) => [map()]
}

@type completed_item :: %{
id: String.t(),
errors: list(map()) | nil
required(:id) => String.t(),
optional(:errors) => [map()]
}

@doc """
Expand Down Expand Up @@ -72,7 +73,7 @@ defmodule Absinthe.Incremental.Response do
- A hasNext flag indicating if more payloads are coming
- Optional completed items to signal completion of specific operations
"""
@spec build_incremental(any(), list(), String.t() | nil, boolean()) :: incremental_response()
@spec build_incremental(any(), [any()], String.t() | nil, boolean()) :: incremental_response()
def build_incremental(data, path, label, has_next) do
incremental_item = %{
data: data,
Expand All @@ -95,7 +96,7 @@ defmodule Absinthe.Incremental.Response do
@doc """
Build an incremental response for streamed list items.
"""
@spec build_stream_incremental(list(), list(), String.t() | nil, boolean()) ::
@spec build_stream_incremental([any()], [any()], String.t() | nil, boolean()) ::
incremental_response()
def build_stream_incremental(items, path, label, has_next) do
incremental_item = %{
Expand All @@ -119,7 +120,7 @@ defmodule Absinthe.Incremental.Response do
@doc """
Build a completion response to signal the end of incremental delivery.
"""
@spec build_completed(list(String.t())) :: incremental_response()
@spec build_completed([String.t()]) :: incremental_response()
def build_completed(completed_ids) do
completed_items =
Enum.map(completed_ids, fn id ->
Expand All @@ -135,7 +136,7 @@ defmodule Absinthe.Incremental.Response do
@doc """
Build an error response for a failed incremental operation.
"""
@spec build_error(list(map()), list(), String.t() | nil, boolean()) :: incremental_response()
@spec build_error([map()], [any()], String.t() | nil, boolean()) :: incremental_response()
def build_error(errors, path, label, has_next) do
incremental_item = %{
errors: errors,
Expand Down Expand Up @@ -193,7 +194,7 @@ defmodule Absinthe.Incremental.Response do
end)
end

defp remove_at_path(data, []), do: nil
defp remove_at_path(_data, []), do: nil

defp remove_at_path(data, [key | rest]) when is_map(data) do
case Map.get(data, key) do
Expand Down
6 changes: 5 additions & 1 deletion lib/absinthe/incremental/transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ defmodule Absinthe.Incremental.Transport do
alias Absinthe.Incremental.{Config, Response}
alias Absinthe.Streaming.Executor

@type conn_or_socket :: Plug.Conn.t() | Phoenix.Socket.t() | any()
# Plug.Conn.t() | Phoenix.Socket.t() — optional dependencies, kept as any()
# so this module can be used without Plug or Phoenix.
@type conn_or_socket :: any()
@type state :: any()
@type response :: map()

Expand Down Expand Up @@ -234,6 +236,7 @@ defmodule Absinthe.Incremental.Transport do

# Get configurable executor (defaults to TaskExecutor)
executor = Absinthe.Streaming.Executor.get_executor(schema, options)

executor_opts = [
timeout: timeout,
max_concurrency: System.schedulers_online() * 2
Expand Down Expand Up @@ -488,6 +491,7 @@ defmodule Absinthe.Incremental.Transport do

# Use configurable executor (defaults to TaskExecutor)
executor = Executor.get_executor(schema, options)

incremental_results =
all_tasks
|> executor.execute(timeout: timeout)
Expand Down
Loading
Loading