diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0b404f2..ef27b68 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -31,6 +31,9 @@ jobs: - server: "iodine" rails_version: "dev" scenario: "broadcast" + - server: "falcon" + rails_version: "dev" + scenario: "broadcast" steps: - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 @@ -66,6 +69,8 @@ jobs: include: - server: "iodine" rails_version: "dev" + - server: "falcon" + rails_version: "dev" steps: - uses: actions/checkout@v4 - uses: ruby/setup-ruby@v1 diff --git a/Gemfile b/Gemfile index 9aaf3fe..dfc4c3a 100644 --- a/Gemfile +++ b/Gemfile @@ -20,7 +20,9 @@ gem "puma", "~> 6.4" gem "redis", "~> 5.0" # Async setup -# TODO +gem "falcon" +gem "async-websocket" +gem "async-redis" # Iodine gem "iodine", require: false diff --git a/Makefile b/Makefile index d54eb6f..18f71f8 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,14 @@ anyt-anycable: bin/dist/anycable-go bundle exec anyt -c "bin/dist/anycable-go" --target-url="ws://localhost:8080/cable" anyt-iodine: - @bundle exec anyt --self-check --require=scripts/anyt/rails/*.rb --rails-command="bundle exec iodine -p 9292 -t 5 -w 2 %{config}" --except=features/server_restart + @bundle exec anyt --self-check --require="{lib/servers/iodine.rb,scripts/anyt/rails/*.rb}" \ + --rails-command="bundle exec iodine scripts/anyt/iodine.ru -p 9292 -t 5 -w 2" \ + --except=streams/single + +anyt-falcon: + ACTION_CABLE_ADAPTER=redis \ + bundle exec anyt --self-check --require="{lib/servers/falcon.rb,scripts/anyt/rails/*.rb}" \ + --rails-command="bundle exec ruby scripts/anyt/falcon.rb" \ + --except=features/server_restart,request/disconnection,features/channel_state .PHONY: websocket-bench diff --git a/README.md b/README.md index 115dd44..66e438f 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,13 @@ $ bundle exec bento --anycable ... +# Falcon +$ bundle exec bento --falcon + +⚡️ Running Action Cable via falcon + +... + # Iodine $ bundle exec bento --iodine diff --git a/lib/application.rb b/lib/application.rb index 5b9a2a7..9aa7319 100644 --- a/lib/application.rb +++ b/lib/application.rb @@ -27,6 +27,10 @@ class App < Rails::Application end end +ActionCable.server.config.cable = { + "adapter" => ENV.fetch("ACTION_CABLE_ADAPTER", "redis"), + "url" => ENV["REDIS_URL"] +} ActionCable.server.config.connection_class = -> { ApplicationCable::Connection } ActionCable.server.config.disable_request_forgery_protection = true ActionCable.server.config.logger = Rails.logger diff --git a/lib/servers/anycable.rb b/lib/servers/anycable.rb index dc9d591..7f19b5c 100644 --- a/lib/servers/anycable.rb +++ b/lib/servers/anycable.rb @@ -3,10 +3,7 @@ require "redis" require "anycable-rails" -ActionCable.server.config.cable = { - "adapter" => $benchmark_server == :anycable ? "any_cable" : "redis", - "url" => ENV["REDIS_URL"] -} +ActionCable.server.config.cable = {"adapter" => "any_cable"} class BenchmarkServer def self.run! diff --git a/lib/servers/falcon.rb b/lib/servers/falcon.rb index b06c507..94511ec 100644 --- a/lib/servers/falcon.rb +++ b/lib/servers/falcon.rb @@ -1,127 +1,241 @@ # frozen_string_literal: true -# TODO: This is an old version from here: https://github.com/anycable/anycable/blob/a0c48aeffe7b57f8abcf49ec244e2129f7424c97/benchmarks/rails/bento#L113 -# Requires upgrade for Action Cable 8 -class AsyncApp - def call(req) - Async::WebSocket::Adapters::HTTP.open(req) do |connection| - env = {url: "/cable"} - - connected = AnyCable.rpc_handler.handle( - :connect, - AnyCable::ConnectionRequest.new(env: env) - ).then do |response| - handle_response(connection, response) - - if response.status != :SUCCESS - connection.close - next false - end +require "async" +require "async/http/endpoint" +require "async/websocket/adapters/rack" +require "async/redis" + +require "falcon" + +module ActionCable + module SubscriptionAdapter + # A PoC of Async-powered Redis subscription adapter. + # TODO: handle Redis connection failures. + # + # Why not regular Redis adapter? It spawns a thread to listen for events and + # perform broadcasts from it—that doesn't work with the Async executor ('no current task'). + class AsyncRedis < Base + prepend ChannelPrefix + + private attr_reader :subscriber + + def initialize(*) + super + @endpoint = ::Async::Redis.local_endpoint + @mutex = Mutex.new + end - true + def broadcast(channel, payload) + publisher.publish(channel, payload) end - next unless connected + def subscribe(channel, callback, success_callback = nil) + subscriber.add_subscriber(channel, callback, success_callback) + end - loop do - msg = connection.read - cmd = Protocol::WebSocket::JSONMessage.wrap(msg)&.to_h + def unsubscribe(channel, callback) + subscriber.remove_subscriber(channel, callback) + end - next unless cmd + def shutdown + subscriber.shutdown if @subscriber + end - identifier = cmd[:identifier] - command = cmd[:command] + private - case command - when "subscribe" - AnyCable.rpc_handler.handle( - :command, - AnyCable::CommandMessage.new( - command:, - identifier:, - connection_identifiers: "{}", - env: - ) - ).then do |response| - handle_response(connection, response, identifier) - end - when "message" - AnyCable.rpc_handler.handle( - :command, - AnyCable::CommandMessage.new( - command:, - identifier:, - connection_identifiers: "{}", - data: cmd[:data], - env: - ) - ).then do |response| - handle_response(connection, response, identifier) + def publisher + @publisher || @mutex.synchronize { @publisher ||= ::Async::Redis::Client.new(@endpoint) } + end + + def subscriber + @subscriber || @mutex.synchronize { @subscriber ||= Subscriber.new(@endpoint, executor) } + end + + class Subscriber < SubscriberMap::Async + private attr_reader :client, :ctx, :task + + def initialize(endpoint, executor) + super(executor) + @client = ::Async::Redis::Client.new(endpoint) + @ctx = new_subscribe_context + @task = new_subscribe_task(ctx) + end + + def add_channel(channel, on_success) + ctx.subscribe(channel) + on_success&.call + end + + def remove_channel(channel) + ctx.unsubscribe(channel) + end + + def shutdown + task&.stop + end + + private + + def new_subscribe_context + client.subscribe("_action_cable_internal") + end + + def new_subscribe_task(ctx) + Async do + while event = ctx.listen + if event.first == "message" + broadcast(event[1], event[2]) + end + end end end end - rescue EOFError - end - end + end + end + + module Async + class Executor + class Timer < Data.define(:task) + def shutdown = task.stop + end - private + private attr_reader :semaphore - def handle_response(connection, response, identifier = nil) - response.transmissions&.each do |msg| - connection.write(msg) - end - connection.flush + def initialize(max_size: 1024) + @semaphore = ::Async::Semaphore.new(max_size) + end - # Command response - if identifier - writer = proc do |msg| - msg = {identifier: identifier, message: JSON.parse(msg)}.to_json - connection.write(msg) - connetion.flush + def post(task = nil, &block) + task ||= block + semaphore.async(&task) end - response.streams&.each do |stream| - ActionCable.server.pubsub.subscribe(stream, writer) + def timer(interval, &block) + task = Async do + loop do + sleep(interval) + block.call + end + end + + Timer.new(task) end + + def shutdown = @executor.shutdown end - end -end -class BenchmarkServer - def self.run! - require "async/websocket" - require "async/websocket/adapters/http" - require 'protocol/websocket/json_message' + class Socket + #== Action Cable socket interface == + attr_reader :env, :logger, :protocol + private attr_reader :conn, :coder, :server - require "falcon/command" - require "falcon/command/serve" + delegate :worker_pool, :logger, to: :server - # Patch Action Cable subscriber to be async-aware - require "async/semaphore" - ActionCable::SubscriptionAdapter::SubscriberMap.prepend(Module.new do - def initialize(...) - super - @semaphore = Async::Semaphore.new(1024) + def initialize(env, conn, server, coder: ActiveSupport::JSON) + @env = env + @coder = coder + @server = server + @conn = conn end - def broadcast(channel, message) - list = @sync.synchronize do - return if !@subscribers.key?(channel) - @subscribers[channel].dup + def request + # Copied from ActionCable::Server::Socket#request + @request ||= begin + environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application + ActionDispatch::Request.new(environment || env) end + end + + def transmit(data) + conn.write(coder.encode(data)) + conn.flush + rescue IOError, Errno::EPIPE => e + logger.debug "Failed to write to the socket: #{e.message}" + end + + def close + conn.close + end + def perform_work(receiver, ...) Async do - list.each do |subscriber| - @semaphore.async do - invoke_callback(subscriber, message) + receiver.send(...) + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end + end + end + + class App + private attr_reader :server, :semaphore + + def initialize(server: ::ActionCable.server, max_size: 1024) + @server = server + @semaphore = ::Async::Semaphore.new(max_size) + end + + def call(env) + ::Async::WebSocket::Adapters::Rack.open(env, protocols: ::ActionCable::INTERNAL[:protocols]) do |conn| + coder = ActiveSupport::JSON + logger = server.logger + # A _Socket interface for Action Cable connection + socket = Socket.new(env, conn, server) + # Action Cable connection instance + connection = server.config.connection_class.call.new(server, socket) + + # Handshake + connection.handle_open + + server.setup_heartbeat_timer + server.add_connection(connection) + + # Main loop + # FIXME: closed socket errors are not triggered for some connections, + # so we cannot detect the disconnect and call #handle_close + # (e.g., when running AnyT tests; that's why we had to skip some) + while (msg = conn.read) + semaphore.async do + logger.debug "[Async WebSocket] incoming message: #{msg.to_str}" + + connection.handle_incoming(coder.decode(msg.to_str)) end end - end + rescue EOFError, Errno::ECONNRESET + logger.debug "[Async WebSocket] connection closed" + if connection + server.remove_connection(connection) + semaphore.async do + connection.handle_close + end + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + end or [200, [], ["Websocket only."]] end - end) + end + end +end + +# That's a workaround for making it possible to run this code in Async and non-Async environment (e.g., AnyT) +unless ENV["ACTION_CABLE_ADAPTER"] == "redis" + ActionCable.server.config.pubsub_adapter = "ActionCable::SubscriptionAdapter::AsyncRedis" +end + +class BenchmarkServer + def self.run! + Sync do + websocket_endpoint = Async::HTTP::Endpoint.parse("http://127.0.0.1:8080/cable") - cmd = Falcon::Command::Serve.new(["-p", "8080", "-b", "tcp://0.0.0.0", "--#{ENV.fetch("FALCON_MODE", "forked")}"]) - cmd.define_singleton_method(:load_app) { AsyncApp.new } - cmd.call + action_cable_server = ActionCable.server + # Replace the default executor with the Async executor + action_cable_server.instance_variable_set(:@executor, ActionCable::Async::Executor.new) + + app = Falcon::Server.middleware(ActionCable::Async::App.new(server: action_cable_server)) + server = Falcon::Server.new(app, websocket_endpoint) + server.run.wait + end end end diff --git a/lib/servers/iodine.rb b/lib/servers/iodine.rb index cdc2bec..9320c1b 100644 --- a/lib/servers/iodine.rb +++ b/lib/servers/iodine.rb @@ -1,12 +1,21 @@ # frozen_string_literal: true require "iodine" +require "redis" module ActionCable module SubscriptionAdapter class Iodine < Base + def initialize(*) + super + @redis = ::Redis.new + end + def broadcast(channel, payload) - ::Iodine.publish(channel, payload) + # FIXME: Doesn't publis to Redis when executed outside of the Iodine server + # (e.g., from AnyT tests) + # ::Iodine.publish(channel, payload) + @redis.publish(channel, payload) end end end @@ -136,8 +145,9 @@ def on_shutdown(conn) logger.debug "[Iodine] connection shutdown" conn.write( coder.encode({ - type: :shutdown, - reason: ::ActionCable::INTERNAL[:disconnect_reasons][:server_restart] + type: :disconnect, + reason: ::ActionCable::INTERNAL[:disconnect_reasons][:server_restart], + reconnect: true }) ) end @@ -153,7 +163,9 @@ def request end end - def transmit(data) = client&.write(coder.encode(data)) + def transmit(data) + client&.write(coder.encode(data)) + end def close = client&.close @@ -164,6 +176,7 @@ def perform_work(receiver, method_name, *args) end end +Iodine::PubSub.default = Iodine::PubSub::Redis.new("redis://localhost:6379") ActionCable.server.config.pubsub_adapter = "ActionCable::SubscriptionAdapter::Iodine" class BenchmarkServer diff --git a/scripts/anyt/falcon.rb b/scripts/anyt/falcon.rb new file mode 100644 index 0000000..9ef865f --- /dev/null +++ b/scripts/anyt/falcon.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +require "anyt/dummy/application" + +require "anyt/tests" +require "anyt/remote_control" + +require_relative "../../lib/servers/falcon" + +# Ensure Async Redis is used at the server side +# (we switch to the regular Redis adapter at the AnyT CLI side, 'cause it's not Async-driven) +ActionCable.server.config.pubsub_adapter = "ActionCable::SubscriptionAdapter::AsyncRedis" + +# Start remote control +Anyt::RemoteControl::Server.start(Anyt.config.remote_control_port) + +# Load channels from tests +Anyt::Tests.load_all_tests + +Rails.application.initialize! + +Sync do + websocket_endpoint = Async::HTTP::Endpoint.parse("http://127.0.0.1:9292/cable") + + action_cable_server = ActionCable.server + # Replace the default executor with the Async executor + action_cable_server.instance_variable_set(:@executor, ActionCable::Async::Executor.new) + + app = Falcon::Server.middleware(ActionCable::Async::App.new(server: action_cable_server)) + server = Falcon::Server.new(app, websocket_endpoint) + server.run.wait +end diff --git a/scripts/anyt/iodine.ru b/scripts/anyt/iodine.ru new file mode 100644 index 0000000..8cd11d5 --- /dev/null +++ b/scripts/anyt/iodine.ru @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +require "anyt/dummy/application" + +require "anyt/tests" +require "anyt/remote_control" + +require_relative "../../lib/servers/iodine" + +# Start remote control +Anyt::RemoteControl::Server.start(Anyt.config.remote_control_port) + +# Load channels from tests +Anyt::Tests.load_all_tests + +Rails.application.initialize! + +app = Rack::Builder.new do + map "/cable" do + use ActionCable::Iodine::Middleware + run(proc { |_| [404, {"Content-Type" => "text/plain"}, ["Not here"]] }) + end +end + +run app