diff --git a/README.md b/README.md index 5f64a7e..8a9ea35 100644 --- a/README.md +++ b/README.md @@ -337,6 +337,34 @@ Or you can use `collect` end ``` +## Async delivering messages + +If you use `Rails` application or you use `ActiveJob`, you can deliver messages using `ActiveJob` + +### Configuration + +You have the next configuration + +```ruby + GraphQL::AnyCable.configure do |config| + # ... other configurations + config.delivery_method = "inline" # the default value "inline", also can be "active_job" + config.queue = "default" # the name of ActiveJob queue + config.job_class = "GraphQL::Jobs::TriggerJob" # the name executor job + end +``` + +`delivery_method` can be either `inline` or `active_job`. +`inline` means that delivering messaging will work sync. +`active_job` - It will add delivering messages operations to `ActiveJob` with queue `default` and using job `GraphQL::Jobs::TriggerJob` + +You can change the queue or job_class by changing it in the configuration + +Or you can run code + +```ruby + GraphQL::AnyCable.delivery_method = "active_job", { queue: "broadcasting", job_class: "GraphQL::Jobs::TriggerJob" } +``` ## Testing applications which use `graphql-anycable` diff --git a/graphql-anycable.gemspec b/graphql-anycable.gemspec index a5496e0..593b42f 100644 --- a/graphql-anycable.gemspec +++ b/graphql-anycable.gemspec @@ -37,4 +37,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency "railties" spec.add_development_dependency "rake", ">= 12.3.3" spec.add_development_dependency "rspec", "~> 3.0" + spec.add_development_dependency "activejob", "~> 6.0" end diff --git a/lib/graphql-anycable.rb b/lib/graphql-anycable.rb index 97d18f1..18bbb5b 100644 --- a/lib/graphql-anycable.rb +++ b/lib/graphql-anycable.rb @@ -7,6 +7,7 @@ require_relative "graphql/anycable/config" require_relative "graphql/anycable/railtie" if defined?(Rails) require_relative "graphql/anycable/stats" +require_relative "graphql/anycable/delivery_adapter" require_relative "graphql/subscriptions/anycable_subscriptions" module GraphQL @@ -25,6 +26,19 @@ def self.stats(**options) Stats.new(**options).collect end + def self.delivery_method=(args) + method_name, options = Array(args) + options ||= {} + + config.delivery_method = method_name + config.queue = options[:queue] if options[:queue] + config.job_class = options[:job_class] if options[:job_class] + end + + def self.delivery_adapter(object) + DeliveryAdapter.lookup(executor_object: object) + end + module_function def redis diff --git a/lib/graphql/adapters/active_job_adapter.rb b/lib/graphql/adapters/active_job_adapter.rb new file mode 100644 index 0000000..e7b4576 --- /dev/null +++ b/lib/graphql/adapters/active_job_adapter.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module GraphQL + module Adapters + class ActiveJobAdapter < BaseAdapter + def trigger(...) + executor_class_job.set(queue: config.queue).perform_later( + executor_object, + executor_method, + ... + ) + end + + private + + def executor_class_job + config.job_class.constantize + end + + def config + GraphQL::AnyCable.config + end + end + end +end diff --git a/lib/graphql/adapters/base_adapter.rb b/lib/graphql/adapters/base_adapter.rb new file mode 100644 index 0000000..77f25e8 --- /dev/null +++ b/lib/graphql/adapters/base_adapter.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module GraphQL + module Adapters + class BaseAdapter + attr_reader :executor_object, :executor_method + + def initialize(executor_object:) + @executor_object = executor_object + @executor_method = executor_object.class::EXECUTOR_METHOD_NAME + end + + def trigger + raise NoMethodError, "#{__method__} method should be implemented in concrete class" + end + end + end +end diff --git a/lib/graphql/adapters/inline_adapter.rb b/lib/graphql/adapters/inline_adapter.rb new file mode 100644 index 0000000..19e3664 --- /dev/null +++ b/lib/graphql/adapters/inline_adapter.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module GraphQL + module Adapters + class InlineAdapter < BaseAdapter + def trigger(...) + executor_object.public_send(executor_method, ...) + end + end + end +end diff --git a/lib/graphql/anycable/config.rb b/lib/graphql/anycable/config.rb index c422ca6..d0af072 100644 --- a/lib/graphql/anycable/config.rb +++ b/lib/graphql/anycable/config.rb @@ -12,6 +12,38 @@ class Config < Anyway::Config attr_config use_redis_object_on_cleanup: true attr_config use_client_provided_uniq_id: true attr_config redis_prefix: "graphql" # Here, we set clear redis_prefix without any hyphen. The hyphen is added at the end of this value on our side. + + attr_config delivery_method: "inline", queue: "default", job_class: "GraphQL::Jobs::TriggerJob" + + def job_class=(value) + ensure_value_is_not_blank!("job_class", value) + + super + end + + def queue=(value) + ensure_value_is_not_blank!("queue", value) + + super + end + + def delivery_method=(value) + ensure_value_is_not_blank!("delivery_method", value) + + super + end + + private + + def empty_value?(value) + value.nil? || value == "" + end + + def ensure_value_is_not_blank!(name, value) + return unless empty_value?(value) + + raise_validation_error("#{name} can not be blank") + end end end end diff --git a/lib/graphql/anycable/delivery_adapter.rb b/lib/graphql/anycable/delivery_adapter.rb new file mode 100644 index 0000000..6753698 --- /dev/null +++ b/lib/graphql/anycable/delivery_adapter.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +require "graphql/adapters/base_adapter" +require "graphql/adapters/inline_adapter" +require "graphql/adapters/active_job_adapter" + +module GraphQL + module AnyCable + class DeliveryAdapter + class << self + def lookup(options) + adapter_class_name = config.delivery_method.to_s.split("_").map(&:capitalize).join + + Adapters.const_get("#{adapter_class_name}Adapter").new(**(options || {})) + rescue NameError => e + raise e.class, "Delivery adapter :#{config.delivery_method} haven't been found", e.backtrace + end + + def config + GraphQL::AnyCable.config + end + end + end + end +end diff --git a/lib/graphql/anycable/railtie.rb b/lib/graphql/anycable/railtie.rb index 2b27333..ed309bf 100644 --- a/lib/graphql/anycable/railtie.rb +++ b/lib/graphql/anycable/railtie.rb @@ -5,6 +5,15 @@ module GraphQL module AnyCable class Railtie < ::Rails::Railtie + initializer "graphql_anycable.load_trigger_job" do + ActiveSupport.on_load(:active_job) do + require "graphql/jobs/trigger_job" + require "graphql/serializers/anycable_subscription_serializer" + + ActiveJob::Serializers.add_serializers(GraphQL::Serializers::AnyCableSubscriptionSerializer) + end + end + rake_tasks do path = File.expand_path(__dir__) Dir.glob("#{path}/tasks/**/*.rake").each { |f| load f } diff --git a/lib/graphql/jobs/trigger_job.rb b/lib/graphql/jobs/trigger_job.rb new file mode 100644 index 0000000..f827a19 --- /dev/null +++ b/lib/graphql/jobs/trigger_job.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module GraphQL + module Jobs + class TriggerJob < ActiveJob::Base + def perform(executor_object, execute_method, event_name, args = {}, object = nil, options = {}) + executor_object.public_send(execute_method, event_name, args, object, **options) + end + end + end +end diff --git a/lib/graphql/serializers/anycable_subscription_serializer.rb b/lib/graphql/serializers/anycable_subscription_serializer.rb new file mode 100644 index 0000000..14a38f1 --- /dev/null +++ b/lib/graphql/serializers/anycable_subscription_serializer.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module GraphQL + module Serializers + class AnyCableSubscriptionSerializer < ActiveJob::Serializers::ObjectSerializer + def serialize?(argument) + argument.kind_of?(GraphQL::Subscriptions::AnyCableSubscriptions) + end + + def serialize(subscription) + super(subscription.collected_arguments) + end + + def deserialize(payload) + GraphQL::Subscriptions::AnyCableSubscriptions.new(**payload) + end + end + end +end diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 6dec5d8..3e3d36c 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -56,14 +56,20 @@ class AnyCableSubscriptions < GraphQL::Subscriptions def_delegators :"GraphQL::AnyCable", :redis, :config + attr_reader :collected_arguments + alias_method :trigger_sync, :trigger + SUBSCRIPTION_PREFIX = "subscription:" # HASH: Stores subscription data: query, context, … FINGERPRINTS_PREFIX = "fingerprints:" # ZSET: To get fingerprints by topic SUBSCRIPTIONS_PREFIX = "subscriptions:" # SET: To get subscriptions by fingerprint CHANNEL_PREFIX = "channel:" # SET: Auxiliary structure for whole channel's subscriptions cleanup + EXECUTOR_METHOD_NAME = "trigger_sync" # method, who executes the sync method "trigger" # @param serializer [<#dump(obj), #load(string)] Used for serializing messages before handing them to `.broadcast(msg)` def initialize(serializer: Serialize, **rest) @serializer = serializer + @collected_arguments = { serializer: serializer, **rest } + super end @@ -206,6 +212,10 @@ def delete_channel_subscriptions(channel_or_id) redis.del(redis_key(CHANNEL_PREFIX) + channel_id) end + def trigger(...) + AnyCable.delivery_adapter(self).trigger(...) + end + private def anycable diff --git a/spec/graphql/anycable_spec.rb b/spec/graphql/anycable_spec.rb index 7149d1b..10f639e 100644 --- a/spec/graphql/anycable_spec.rb +++ b/spec/graphql/anycable_spec.rb @@ -1,5 +1,8 @@ # frozen_string_literal: true +require "active_job" +require "graphql/jobs/trigger_job" + RSpec.describe GraphQL::AnyCable do subject do AnycableSchema.execute( @@ -263,9 +266,94 @@ describe ".stats" do it "calls Graphql::AnyCable::Stats" do - allow_any_instance_of(GraphQL::AnyCable::Stats).to receive(:collect) + expect_any_instance_of(GraphQL::AnyCable::Stats).to receive(:collect) described_class.stats end end + + describe ".delivery_adapter" do + context "when config.delivery_method is inline" do + around do |ex| + old_value = GraphQL::AnyCable.config.delivery_method + GraphQL::AnyCable.config.delivery_method = "inline" + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + end + + it "calls InlineAdapter" do + expect(GraphQL::Adapters::InlineAdapter).to receive(:new).with(executor_object: "any_object") + + described_class.delivery_adapter("any_object") + end + end + + context "when config.delivery_method is active_job" do + around do |ex| + old_value = GraphQL::AnyCable.config.delivery_method + GraphQL::AnyCable.config.delivery_method = "active_job" + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + end + + it "calls ActiveJobAdapter" do + expect(GraphQL::Adapters::ActiveJobAdapter).to receive(:new).with(executor_object: "any_object") + + described_class.delivery_adapter("any_object") + end + end + end + + describe ".delivery_method" do + let(:config) { GraphQL::AnyCable.config } + + after do + config.delivery_method = "inline" + config.queue = "default" + config.job_class = "GraphQL::Jobs::TriggerJob" + end + + it "changes config" do + expect(config.delivery_method).to eq("inline") + expect(config.queue).to eq("default") + expect(config.job_class).to eq("GraphQL::Jobs::TriggerJob") + + described_class.delivery_method = :active_job, { queue: "test", job_class: "CustomJob" } + + expect(config.delivery_method).to eq(:active_job) + expect(config.queue).to eq("test") + expect(config.job_class).to eq("CustomJob") + end + + context "when entered empty delivery_method" do + it "raises an error" do + expect { described_class.delivery_method = nil }.to raise_error( + Anyway::Config::ValidationError, + /delivery_method can not be blank/, + ) + end + end + + context "when entered invalid queue" do + it "raises an error" do + expect { described_class.delivery_method = "inline", { queue: "" } }.to raise_error( + Anyway::Config::ValidationError, + /queue can not be blank/, + ) + end + end + + context "when entered invalid job_class" do + it "raises an error" do + expect { described_class.delivery_method = "inline", { job_class: "" } }.to raise_error( + Anyway::Config::ValidationError, + /job_class can not be blank/, + ) + end + end + end end diff --git a/spec/graphql/broadcast_spec.rb b/spec/graphql/broadcast_spec.rb index fe5d485..00ff68d 100644 --- a/spec/graphql/broadcast_spec.rb +++ b/spec/graphql/broadcast_spec.rb @@ -1,5 +1,9 @@ # frozen_string_literal: true +require "active_job" +require "graphql/jobs/trigger_job" +require "graphql/serializers/anycable_subscription_serializer" + RSpec.describe "Broadcasting" do def subscribe(query) BroadcastSchema.execute( @@ -17,7 +21,7 @@ def subscribe(query) end let(:object) do - double("Post", id: 1, title: "Broadcasting…", actions: %w[Edit Delete]) + double("article", id: 1, title: "Broadcasting…", actions: %w[Edit Delete]).extend(GlobalID::Identification) end let(:query) do @@ -32,54 +36,157 @@ def subscribe(query) allow(channel).to receive(:stream_from) allow(channel).to receive(:params).and_return("channelId" => "ohmycables") allow(anycable).to receive(:broadcast) + allow(GlobalID).to receive(:app).and_return("example") + allow(RSpec::Mocks::Double).to receive(:find).and_return(object) end - context "when all clients asks for broadcastable fields only" do - let(:query) do - <<~GRAPHQL.strip - subscription SomeSubscription { postCreated{ id title } } - GRAPHQL + context "when config.deliver_method is active_job" do + before(:all) do + ActiveJob::Serializers.add_serializers(GraphQL::Serializers::AnyCableSubscriptionSerializer) end - it "uses broadcasting to resolve query only once" do - 2.times { subscribe(query) } - BroadcastSchema.subscriptions.trigger(:post_created, {}, object) - expect(object).to have_received(:title).once - expect(anycable).to have_received(:broadcast).once + around(:all) do |ex| + old_queue = ActiveJob::Base.queue_adapter + old_value = GraphQL::AnyCable.config.delivery_method + + GraphQL::AnyCable.config.delivery_method = "active_job" + ActiveJob::Base.queue_adapter = :inline + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + ActiveJob::Base.queue_adapter = old_queue end - end - context "when all clients asks for non-broadcastable fields" do - let(:query) do - <<~GRAPHQL.strip - subscription SomeSubscription { postCreated{ id title actions } } - GRAPHQL + context "when all clients asks for broadcastable fields only" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id title } } + GRAPHQL + end + + it "uses broadcasting to resolve query only once" do + 2.times { subscribe(query) } + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to receive(:perform).and_call_original + + BroadcastSchema.subscriptions.trigger(:post_created, {}, object) + + expect(object).to have_received(:title).once + expect(anycable).to have_received(:broadcast).once + end end - it "resolves query for every client" do - 2.times { subscribe(query) } - BroadcastSchema.subscriptions.trigger(:post_created, {}, object) - expect(object).to have_received(:title).twice - expect(anycable).to have_received(:broadcast).twice + context "when all clients asks for non-broadcastable fields" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id title actions } } + GRAPHQL + end + + it "resolves query for every client" do + 2.times { subscribe(query) } + + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to receive(:perform).and_call_original + + BroadcastSchema.subscriptions.trigger(:post_created, {}, object) + expect(object).to have_received(:title).twice + expect(anycable).to have_received(:broadcast).twice + end + end + + context "when one of subscriptions got expired" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id title } } + GRAPHQL + end + + let(:redis) { AnycableSchema.subscriptions.redis } + + it "doesn't fail" do + 3.times { subscribe(query) } + redis.keys("graphql-subscription:*").last.tap(&redis.method(:del)) + expect(redis.keys("graphql-subscription:*").size).to eq(2) + + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to receive(:perform).and_call_original + + expect { BroadcastSchema.subscriptions.trigger(:post_created, {}, object) }.not_to raise_error + expect(object).to have_received(:title).once + expect(anycable).to have_received(:broadcast).once + end end end - context "when one of subscriptions got expired" do - let(:query) do - <<~GRAPHQL.strip - subscription SomeSubscription { postCreated{ id title } } - GRAPHQL + context "when config.deliver_method is inline" do + around(:all) do |ex| + old_queue = ActiveJob::Base.queue_adapter + old_value = GraphQL::AnyCable.config.delivery_method + + GraphQL::AnyCable.config.delivery_method = "inline" + ActiveJob::Base.queue_adapter = :test + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + ActiveJob::Base.queue_adapter = old_queue + end + + context "when all clients asks for broadcastable fields only" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id title } } + GRAPHQL + end + + it "uses broadcasting to resolve query only once" do + 2.times { subscribe(query) } + + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to_not receive(:perform) + + BroadcastSchema.subscriptions.trigger(:post_created, {}, object) + expect(object).to have_received(:title).once + expect(anycable).to have_received(:broadcast).once + end end - let(:redis) { AnycableSchema.subscriptions.redis } + context "when all clients asks for non-broadcastable fields" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id title actions } } + GRAPHQL + end + + it "resolves query for every client" do + 2.times { subscribe(query) } + + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to_not receive(:perform) + + BroadcastSchema.subscriptions.trigger(:post_created, {}, object) + expect(object).to have_received(:title).twice + expect(anycable).to have_received(:broadcast).twice + end + end + + context "when one of subscriptions got expired" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id title } } + GRAPHQL + end + + let(:redis) { AnycableSchema.subscriptions.redis } + + it "doesn't fail" do + 3.times { subscribe(query) } + redis.keys("graphql-subscription:*").last.tap(&redis.method(:del)) + expect(redis.keys("graphql-subscription:*").size).to eq(2) + + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to_not receive(:perform) - it "doesn't fail" do - 3.times { subscribe(query) } - redis.keys("graphql-subscription:*").last.tap(&redis.method(:del)) - expect(redis.keys("graphql-subscription:*").size).to eq(2) - expect { BroadcastSchema.subscriptions.trigger(:post_created, {}, object) }.not_to raise_error - expect(object).to have_received(:title).once - expect(anycable).to have_received(:broadcast).once + expect { BroadcastSchema.subscriptions.trigger(:post_created, {}, object) }.not_to raise_error + expect(object).to have_received(:title).once + expect(anycable).to have_received(:broadcast).once + end end end end diff --git a/spec/graphql/delivery_adapter_spec.rb b/spec/graphql/delivery_adapter_spec.rb new file mode 100644 index 0000000..35a2398 --- /dev/null +++ b/spec/graphql/delivery_adapter_spec.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +RSpec.describe GraphQL::AnyCable::DeliveryAdapter do + describe ".lookup" do + context "when config.delivery_method is valid" do + around do |ex| + old_value = GraphQL::AnyCable.config.delivery_method + GraphQL::AnyCable.config.delivery_method = :inline + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + end + + it "returns InlineAdapter" do + expect(GraphQL::Adapters::InlineAdapter).to receive(:new).with(executor_object: "object") + + described_class.lookup(executor_object: "object") + end + end + + context "when config.delivery_method is invalid" do + around do |ex| + old_value = GraphQL::AnyCable.config.delivery_method + GraphQL::AnyCable.config.delivery_method = :unknown_adapter + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + end + + it "raises an error" do + expect { described_class.lookup(executor_object: "object") }.to raise_error( + NameError, + /Delivery adapter :unknown_adapter haven't been found/, + ) + end + end + end +end diff --git a/spec/jobs/trigger_job_spec.rb b/spec/jobs/trigger_job_spec.rb new file mode 100644 index 0000000..ae7f293 --- /dev/null +++ b/spec/jobs/trigger_job_spec.rb @@ -0,0 +1,92 @@ +# frozen_string_literal: true + +require "active_job" +require "graphql/jobs/trigger_job" +require "graphql/serializers/anycable_subscription_serializer" + +RSpec.describe GraphQL::Jobs::TriggerJob do + subject(:job) { described_class.perform_later(*job_payload) } + subject(:trigger_changes) { AnycableSchema.subscriptions.trigger(*trigger_sync_arguments) } + + before(:all) do + ActiveJob::Serializers.add_serializers(GraphQL::Serializers::AnyCableSubscriptionSerializer) + end + + before do + AnycableSchema.execute( + query: query, + context: { channel: channel, subscription_id: "some-truly-random-number" }, + variables: {}, + operation_name: "SomeSubscription", + ) + end + + let(:trigger_sync_arguments) do + [ + :product_updated, + {}, + {id: 1, title: "foo"} + ] + end + + let(:job_payload) do + [ + { schema: "AnycableSchema", "serializer": "GraphQL::Subscriptions::Serialize" }, + "trigger_sync", + *trigger_sync_arguments + ] + end + + let(:query) do + <<~GRAPHQL + subscription SomeSubscription { productUpdated { id } } + GRAPHQL + end + + let(:channel) do + socket = double("Socket", istate: AnyCable::Socket::State.new({})) + connection = double("Connection", anycable_socket: socket) + double("Channel", id: "legacy_id", params: { "channelId" => "legacy_id" }, stream_from: nil, connection: connection) + end + + context "when config.delivery_method is active_job" do + around do |ex| + old_queue = ActiveJob::Base.queue_adapter + old_value = GraphQL::AnyCable.config.delivery_method + + GraphQL::AnyCable.config.delivery_method = "active_job" + ActiveJob::Base.queue_adapter = :inline + + ex.run + + GraphQL::AnyCable.config.delivery_method = old_value + ActiveJob::Base.queue_adapter = old_queue + end + + it "executes AnyCableSubscriptions" do + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to receive(:perform) + expect(GraphQL::Jobs::TriggerJob).to receive(:set).with(queue: "default").and_call_original + + trigger_changes + end + + context "when config.queue is 'test'" do + around do |ex| + old_queue = GraphQL::AnyCable.config.queue + GraphQL::AnyCable.config.queue = "test" + + ex.run + + GraphQL::AnyCable.config.queue = old_queue + end + + it "executes AnyCableSubscriptions" do + expect_any_instance_of(GraphQL::Jobs::TriggerJob).to receive(:perform) + expect(GraphQL::Jobs::TriggerJob).to receive(:set).with(queue: "test").and_call_original + + trigger_changes + end + end + end +end +