From 4c208ebd0727a51424ff7486c32ed925fd91b5e7 Mon Sep 17 00:00:00 2001 From: Niklas van Schrick Date: Sun, 31 May 2026 00:33:19 +0200 Subject: [PATCH] Setup ActionCable for GraphQL subscriptions --- app/channels/application_cable/channel.rb | 40 ++++++++++++ app/channels/graphql_channel.rb | 47 ++++++++++++++ app/graphql/sagittarius_schema.rb | 5 +- .../subscriptions/base_subscription.rb | 19 ++++++ app/graphql/subscriptions/echo.rb | 30 +++++++++ app/graphql/types/subscription_type.rb | 11 ++++ config.cable.ru | 6 ++ config/cable.yml | 6 +- config/initializers/action_cable.rb | 5 ++ config/puma.rb | 2 + config/routes.rb | 1 + docs/graphql/subscription/echo.md | 20 ++++++ lib/sagittarius/graphql/mount_subscription.rb | 27 ++++++++ spec/channels/graphql_channel_spec.rb | 65 +++++++++++++++++++ .../graphql_subscription_support.rb | 39 +++++++++++ tooling/graphql/docs/helper.rb | 1 + tooling/graphql/docs/parser.rb | 28 +++++++- .../docs/templates/subscription.md.erb | 25 +++++++ 18 files changed, 369 insertions(+), 8 deletions(-) create mode 100644 app/channels/graphql_channel.rb create mode 100644 app/graphql/subscriptions/base_subscription.rb create mode 100644 app/graphql/subscriptions/echo.rb create mode 100644 app/graphql/types/subscription_type.rb create mode 100644 config.cable.ru create mode 100644 config/initializers/action_cable.rb create mode 100644 docs/graphql/subscription/echo.md create mode 100644 lib/sagittarius/graphql/mount_subscription.rb create mode 100644 spec/channels/graphql_channel_spec.rb create mode 100644 spec/support/shared_examples/graphql_subscription_support.rb create mode 100644 tooling/graphql/docs/templates/subscription.md.erb diff --git a/app/channels/application_cable/channel.rb b/app/channels/application_cable/channel.rb index 9aec23053..437747d79 100644 --- a/app/channels/application_cable/channel.rb +++ b/app/channels/application_cable/channel.rb @@ -2,5 +2,45 @@ module ApplicationCable class Channel < ActionCable::Channel::Base + def subscribe_to_channel + with_context { super } + end + + def perform_action(data) + with_context { super } + end + + protected + + def find_authentication(authorization) + return Sagittarius::Authentication.new(:none, nil) if authorization.blank? + + token_type, token = authorization.split(' ', 2) + + create_authentication(token_type, token) + end + + def create_authentication(token_type, token) + case token_type + when 'Session' + Sagittarius::Authentication.new(:session, UserSession.find_by(token: token, active: true)) + else + Sagittarius::Authentication.new(:invalid, nil) + end + end + + def with_context(&block) + Code0::ZeroTrack::Context.with_context( + application: 'cable', + ip_address: request_ip, + &block + ) + end + + def request_ip + return unless connection.respond_to?(:env) + + ::Rack::Request.new(connection.env).ip + end end end diff --git a/app/channels/graphql_channel.rb b/app/channels/graphql_channel.rb new file mode 100644 index 000000000..ad086c0c9 --- /dev/null +++ b/app/channels/graphql_channel.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +class GraphqlChannel < ApplicationCable::Channel + periodically :verify_authentication, every: 30.seconds + + def subscribed + @token = params[:token] + @subscription_ids = [] + + verify_authentication + end + + def execute(data) + result = SagittariusSchema.execute( + query: data['query'], + context: { + current_authentication: find_authentication(@token), + visibility_profile: :execution, + channel: self, + }, + variables: data['variables'], + operation_name: data['operationName'] + ) + + @subscription_ids << result.context[:subscription_id] if result.context[:subscription_id] + + transmit({ result: result.to_h, more: result.subscription? }) + end + + def unsubscribed + @subscription_ids.each do |sid| + SagittariusSchema.subscriptions.delete_subscription(sid) + end + end + + private + + def verify_authentication + with_context do + authentication = find_authentication(@token) + return unless authentication.invalid? || authentication.none? + + @subscription_ids.each { |sid| SagittariusSchema.subscriptions.delete_subscription(sid) } + reject + end + end +end diff --git a/app/graphql/sagittarius_schema.rb b/app/graphql/sagittarius_schema.rb index 6f2a816e7..c2a88b42f 100644 --- a/app/graphql/sagittarius_schema.rb +++ b/app/graphql/sagittarius_schema.rb @@ -4,15 +4,18 @@ class SagittariusSchema < GraphQL::Schema mutation(Types::MutationType) query(Types::QueryType) + subscription(Types::SubscriptionType) default_max_page_size 50 max_depth 20 connections.add(ActiveRecord::Relation, Sagittarius::Graphql::StableConnection) + use GraphQL::Subscriptions::ActionCableSubscriptions + # For batch-loading (see https://graphql-ruby.org/dataloader/overview.html) use GraphQL::Dataloader - use GraphQL::Schema::Visibility, profiles: { + use GraphQL::Schema::Visibility, preload: true, profiles: { execution: {}, types: {}, docs: {}, diff --git a/app/graphql/subscriptions/base_subscription.rb b/app/graphql/subscriptions/base_subscription.rb new file mode 100644 index 000000000..06cd8d276 --- /dev/null +++ b/app/graphql/subscriptions/base_subscription.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Subscriptions + class BaseSubscription < GraphQL::Schema::Subscription + argument_class Types::BaseArgument + field_class Types::BaseField + object_class Types::BaseObject + + def current_authentication + context[:current_authentication] + end + + def self.generate_payload_type + result = super + result.graphql_name("#{graphql_name}SubscriptionPayload") + result + end + end +end diff --git a/app/graphql/subscriptions/echo.rb b/app/graphql/subscriptions/echo.rb new file mode 100644 index 000000000..df0934f21 --- /dev/null +++ b/app/graphql/subscriptions/echo.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module Subscriptions + class Echo < BaseSubscription + description <<~DOC + A subscription that does not perform any real updates. + + This is expected to be used for testing of endpoints, to verify + that a user has subscription access. + DOC + + argument :message, + type: GraphQL::Types::String, + required: false, + description: 'Message to return to the user.' + + field :message, + type: GraphQL::Types::String, + null: true, + description: 'Message returned to the user.' + + def subscribe(message: nil) + { message: message } + end + + def update(*) + { message: object[:message] } + end + end +end diff --git a/app/graphql/types/subscription_type.rb b/app/graphql/types/subscription_type.rb new file mode 100644 index 000000000..e08ecfb4d --- /dev/null +++ b/app/graphql/types/subscription_type.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module Types + class SubscriptionType < Types::BaseObject + description 'Root subscription type' + + include Sagittarius::Graphql::MountSubscription + + mount_subscription Subscriptions::Echo + end +end diff --git a/config.cable.ru b/config.cable.ru new file mode 100644 index 000000000..b60d9d455 --- /dev/null +++ b/config.cable.ru @@ -0,0 +1,6 @@ +# frozen_string_literal: true + +require_relative 'config/environment' +Rails.application.eager_load! + +run ActionCable.server diff --git a/config/cable.yml b/config/cable.yml index 00c354074..cc73650ef 100644 --- a/config/cable.yml +++ b/config/cable.yml @@ -1,10 +1,8 @@ development: - adapter: async + adapter: postgresql test: adapter: test production: - adapter: redis - url: <%= ENV.fetch("REDIS_URL") { "redis://localhost:6379/1" } %> - channel_prefix: sagittarius_production + adapter: postgresql diff --git a/config/initializers/action_cable.rb b/config/initializers/action_cable.rb new file mode 100644 index 000000000..3cb2544a5 --- /dev/null +++ b/config/initializers/action_cable.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +Rails.application.configure do + config.action_cable.disable_request_forgery_protection = true +end diff --git a/config/puma.rb b/config/puma.rb index 7f4e6eddf..c9191d082 100644 --- a/config/puma.rb +++ b/config/puma.rb @@ -4,6 +4,8 @@ # are invoked here are part of Puma's configuration DSL. For more information # about methods provided by the DSL, see https://puma.io/puma/Puma/DSL.html. +require_relative 'environment' + # Puma can serve each request in a thread from an internal thread pool. # The `threads` method setting takes two numbers: a minimum and maximum. # Any libraries that use thread pools should be configured to match diff --git a/config/routes.rb b/config/routes.rb index 961921484..87ed5b886 100644 --- a/config/routes.rb +++ b/config/routes.rb @@ -11,4 +11,5 @@ get '/health/liveness' => 'rails/health#show', as: :rails_health_check mount GoodJob::Engine => 'good_job' + mount ActionCable.server => '/cable' end diff --git a/docs/graphql/subscription/echo.md b/docs/graphql/subscription/echo.md new file mode 100644 index 000000000..d06a250ba --- /dev/null +++ b/docs/graphql/subscription/echo.md @@ -0,0 +1,20 @@ +--- +title: echo +--- + +A subscription that does not perform any real updates. + +This is expected to be used for testing of endpoints, to verify +that a user has subscription access. + +## Arguments + +| Name | Type | Description | +|------|------|-------------| +| `message` | [`String`](../scalar/string.md) | Message to return to the user. | + +## Fields + +| Name | Type | Description | +|------|------|-------------| +| `message` | [`String`](../scalar/string.md) | Message returned to the user. | diff --git a/lib/sagittarius/graphql/mount_subscription.rb b/lib/sagittarius/graphql/mount_subscription.rb new file mode 100644 index 000000000..fa850c893 --- /dev/null +++ b/lib/sagittarius/graphql/mount_subscription.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Sagittarius + module Graphql + module MountSubscription + extend ActiveSupport::Concern + + class_methods do + def mount_subscription(subscription_class, **custom_kwargs) + # Using an underscored field name symbol will make `graphql-ruby` + # standardize the field name + field subscription_class.graphql_name.underscore.to_sym, + subscription: subscription_class, + **custom_kwargs + end + + def mount_aliased_subscription(alias_name, subscription_class, **custom_kwargs) + aliased_subscription_class = Class.new(subscription_class) do + graphql_name alias_name + end + + mount_subscription(aliased_subscription_class, **custom_kwargs) + end + end + end + end +end diff --git a/spec/channels/graphql_channel_spec.rb b/spec/channels/graphql_channel_spec.rb new file mode 100644 index 000000000..42d5f8052 --- /dev/null +++ b/spec/channels/graphql_channel_spec.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe GraphqlChannel do + include AuthenticationHelpers + + include_context 'with graphql subscription support' + + let(:user) { create(:user) } + let(:token) { "Session #{authorization_token(user)}" } + + describe '#subscribed' do + context 'with valid token' do + it 'confirms subscription' do + subscribe(token: token) + expect(subscription).to be_confirmed + end + end + + context 'with invalid token' do + it 'rejects subscription' do + subscribe(token: 'Session invalid') + expect(subscription).to be_rejected + end + end + + context 'without token' do + it 'rejects subscription' do + subscribe(token: nil) + expect(subscription).to be_rejected + end + end + end + + describe '#execute' do + before { subscribe(token: token) } + + it 'returns the initial subscription result' do + perform :execute, + query: 'subscription($message: String) { echo(message: $message) { message } }', + variables: { message: 'hello' } + + expect(transmissions.last).to include( + 'result' => { 'data' => { 'echo' => { 'message' => 'hello' } } }, + 'more' => true + ) + end + + it 'receives updates when triggered' do + variables = { message: 'hello' } + perform :execute, + query: 'subscription($message: String) { echo(message: $message) { message } }', + variables: variables + + SagittariusSchema.subscriptions.trigger(:echo, variables, { message: 'updated' }, + context: { visibility_profile: :execution }) + + expect(transmissions.last).to include( + 'result' => { 'data' => { 'echo' => { 'message' => 'updated' } } }, + 'more' => true + ) + end + end +end diff --git a/spec/support/shared_examples/graphql_subscription_support.rb b/spec/support/shared_examples/graphql_subscription_support.rb new file mode 100644 index 000000000..d1865d5af --- /dev/null +++ b/spec/support/shared_examples/graphql_subscription_support.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +RSpec.shared_context 'with graphql subscription support' do + let(:stream_callbacks) { {} } + let(:subscription_streams) { {} } + + before do + callbacks = stream_callbacks + sub_streams = subscription_streams + test_spec = self + + ActionCable::Channel::ChannelStub.define_method(:stream_from) do |broadcasting, coder: nil, &block| + streams << broadcasting + if block + callbacks[broadcasting] = { block: block, coder: coder } + else + sub_streams[broadcasting] = true + end + end + + pubsub = ActionCable.server.pubsub + allow(pubsub).to receive(:broadcast).and_wrap_original do |method, stream, message| + method.call(stream, message) + if (cb = callbacks[stream]) + decoded = cb[:coder] ? cb[:coder].decode(message) : message + cb[:block].call(decoded) + elsif sub_streams[stream] + decoded = ActiveSupport::JSON.decode(message) + test_spec.subscription.send(:transmit, decoded) + end + end + end + + after do + ActionCable::Channel::ChannelStub.define_method(:stream_from) do |broadcasting, *| + streams << broadcasting + end + end +end diff --git a/tooling/graphql/docs/helper.rb b/tooling/graphql/docs/helper.rb index 054a114d2..496113723 100644 --- a/tooling/graphql/docs/helper.rb +++ b/tooling/graphql/docs/helper.rb @@ -18,6 +18,7 @@ def rendering_objects enum: parser.elements[:enum], object: parser.elements[:object], mutation: parser.elements[:mutation], + subscription: parser.elements[:subscription], input_object: parser.elements[:input_object], } end diff --git a/tooling/graphql/docs/parser.rb b/tooling/graphql/docs/parser.rb index 33623cc36..207153dd0 100644 --- a/tooling/graphql/docs/parser.rb +++ b/tooling/graphql/docs/parser.rb @@ -30,6 +30,7 @@ def parse process_interfaces process_mutations + process_subscriptions reject_types end @@ -39,10 +40,14 @@ def reject_types name = element[:name] next true if name.start_with?('__') next true if type == :object && name == 'Mutation' + next true if type == :object && name == 'Subscription' - next true if type == :object && name.end_with?('Payload') && elements[:mutation].find do |mutation| - mutation[:name] == name.chomp('Payload').camelcase(:lower) - end + next true if type == :object && name.end_with?('Payload') && ( + elements[:mutation].find { |mutation| mutation[:name] == name.chomp('Payload').camelcase(:lower) } || + elements[:subscription]&.find do |subscription| + subscription[:type][:name] == name + end + ) type == :input_object && name.end_with?('Input') && elements[:mutation].find do |mutation| mutation[:name] == name.chomp('Input').camelcase(:lower) @@ -83,6 +88,23 @@ def process_mutations elements[:mutation] = mutations end + def process_subscriptions + subscription_type = elements[:object].find { |obj| obj[:name] == 'Subscription' } + return if subscription_type.nil? + + subscriptions = subscription_type[:fields] + + subscriptions.each do |subscription| + payload_object = elements[:object].find { |type| type[:name] == subscription[:type][:name] } + assert!(payload_object.present?, + "Cannot find #{subscription[:type][:name]} as payload for #{subscription[:name]}") + + subscription[:fields] = payload_object[:fields] + end + + elements[:subscription] = subscriptions + end + def parse_type_specific(type, element) if type < ::GraphQL::Schema::Object element[:fields] = build_fields(type.fields) diff --git a/tooling/graphql/docs/templates/subscription.md.erb b/tooling/graphql/docs/templates/subscription.md.erb new file mode 100644 index 000000000..5abea07a2 --- /dev/null +++ b/tooling/graphql/docs/templates/subscription.md.erb @@ -0,0 +1,25 @@ +--- +title: <%= object[:name] %> +--- + +<%= object[:description].chomp %> +<% unless object[:markdown_documentation].nil? -%> + +<%= object[:markdown_documentation].chomp %> +<% end -%> + +## Arguments + +| Name | Type | Description | +|------|------|-------------| +<% sorted_by_name.call(object[:arguments]).each do |argument| -%> +| `<%= argument[:name] %>` | [`<%= argument[:type][:info] %>`](../<%= argument[:type][:path] %>.md) | <%= argument[:description] %> | +<% end -%> + +## Fields + +| Name | Type | Description | +|------|------|-------------| +<% sorted_by_name.call(object[:fields]).each do |field| -%> +| `<%= field[:name] %>` | [`<%= field[:type][:info] %>`](../<%= field[:type][:path] %>.md) | <%= field[:description] %> | +<% end -%>