diff --git a/.anycable-rails-revision b/.anycable-rails-revision new file mode 100644 index 0000000..d1b440c --- /dev/null +++ b/.anycable-rails-revision @@ -0,0 +1 @@ +https://github.com/anycable/anycable-rails.git#feat/actioncable-v8 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..8025759 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,42 @@ +name: CI + +on: + push: + branches: + - master + pull_request: + workflow_dispatch: + +jobs: + wsdirector: + if: ${{ !contains(github.event.head_commit.message, '[ci skip]') }} + runs-on: ubuntu-latest + env: + BUNDLE_JOBS: 4 + BUNDLE_RETRY: 3 + CI: true + services: + redis: + image: redis:7.0-alpine + ports: ["6379:6379"] + options: --health-cmd="redis-cli ping" --health-interval 1s --health-timeout 3s --health-retries 30 + strategy: + fail-fast: false + matrix: + server: ["puma", "anycable"] + steps: + - uses: actions/checkout@v4 + - uses: ruby/setup-ruby@v1 + with: + ruby-version: 3.3 + bundler-cache: true + - name: Run server + run: | + bundle exec bento --${{ matrix.server }} & + bin/wait_tcp 8080 + - name: Run echo scenario + run: | + SCENARIO=echo make wsdirector + - name: Run broadcast scenario + run: | + SCENARIO=broadcast make wsdirector diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..36dd26a --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +anycable-go-* +k6 + +.rails-path +.anycable-path + +bin/dist +!bin/dist/.keep + +Gemfile.lock diff --git a/.rails-revision b/.rails-revision new file mode 100644 index 0000000..7dff62f --- /dev/null +++ b/.rails-revision @@ -0,0 +1 @@ +https://github.com/palkan/rails.git#refactor/action-cable-server-adapterization diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..dbf10e0 --- /dev/null +++ b/Gemfile @@ -0,0 +1,50 @@ +source "https://rubygems.org" + +rails_path = File.file?(File.join(__dir__, ".rails-path")) ? File.read(File.join(__dir__, ".rails-path")).strip : File.join(__dir__, "../rails") + +# Use local Rails copy if available +if File.directory?(rails_path) + gem "rails", group: :preload, path: rails_path +# Use Rails from a git repo +elsif File.file?(File.join(__dir__, ".rails-revision")) + git, branch = *File.read(File.join(__dir__, ".rails-revision")).strip.split("#", 2) + gem "rails", group: :preload, git:, branch: +else + gem "rails", "~> 8.0" +end + +# Baseline setup: Puma + Redis pub/sub +gem "puma", "~> 6.4" +gem "redis", "~> 5.0", group: :preload + +# Async setup +# TODO + +# AnyCable setup +gem "grpc_kit" if ENV["ANYCABLE_GRPC_IMPL"] == "grpc_kit" +gem "grpc" unless ENV["ANYCABLE_GRPC_IMPL"] == "grpc_kit" + +anycable_dir_path = File.file?(File.join(__dir__, ".anycable-path")) ? File.read(File.join(__dir__, ".anycable-path")).strip : File.join(__dir__, "..") + +if File.file?(File.join(anycable_dir_path, "anycable/anycable-core.gemspec")) + gem "anycable-core", group: :preload, path: File.join(anycable_dir_path, "anycable") +elsif File.file?(File.join(__dir__, ".anycable-revision")) + git, branch = *File.read(File.join(__dir__, ".anycable-revision")).strip.split("#", 2) + gem "anycable-core", require: false, group: :preload, git:, branch: +else + gem "anycable-core" +end + +if File.file?(File.join(anycable_dir_path, "anycable-rails/anycable-rails.gemspec")) + gem "anycable-rails", group: :preload, path: File.join(anycable_dir_path, "anycable-rails") +elsif File.file?(File.join(__dir__, ".anycable-rails-revision")) + git, branch = *File.read(File.join(__dir__, ".anycable-rails-revision")).strip.split("#", 2) + gem "anycable-rails", require: false, group: :preload, git:, branch: +else + gem "anycable-rails" +end + +# Tools +gem "wsdirector-cli", require: false + +gem "debug" unless ENV["CI"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..3505143 --- /dev/null +++ b/Makefile @@ -0,0 +1,24 @@ +SCENARIO ?= echo +SCALE ?= 10 + +websocket-bench: + websocket-bench broadcast ws://localhost:8080/cable \ + --concurrent 8 \ + --sample-size 100 \ + --step-size 200 \ + --payload-padding 200 \ + --total-steps 10 \ + --origin http://0.0.0.0 \ + --server-type=actioncable + +build-k6: + @test -x ./k6 || \ + xk6 build v0.42.0 --with github.com/anycable/xk6-cable@latest + +k6: build-k6 + ./k6 run scripts/k6/benchmark.js + +wsdirector: + @wsdirector -f scripts/wsdirector/$(SCENARIO).yml -u ws://localhost:8080/cable -s $(SCALE) + +.PHONY: websocket-bench diff --git a/README.md b/README.md new file mode 100644 index 0000000..afc9e3b --- /dev/null +++ b/README.md @@ -0,0 +1,68 @@ +# Action Cable 8 playground + +This is a support project for the Action Server adapterization happening here: [rails/rails#50979][the-pr]. + +This project contains examples, tests and benchmarks aiming to ensure that the refactoring goes well and brings the promised benefits. + +## Usage + +Install the dependencies: + +```sh +bundle install +``` + +NOTE: You can use local dependencies (Rails, AnyCable) either by following path conventions or by providing `.xxx-path` files (see the Gemfile). + +Now you should be able to run a minimal Action Cable application via one of the supported web servers. By default, Puma is used: + +```sh +$ bundle exec bento + +⚡️ Running Action Cable via puma + +[18378] Puma starting in cluster mode... + +... +``` + +Other servers: + +```sh +# AnyCable + +$ bundle exec bento --anycable + +⚡️ Running Action Cable via anycable + +2024-09-13 10:07:54.272 INF Starting AnyCable 1.5.3-56288f0 (pid: 37818, open file limit: 122880, gomaxprocs: 8) nodeid=NFoelH + +... +``` + +## Tests + +You can run basic smoke tests via [wsdirector][] as follows (NOTE: the server must be running): + +```sh +# The default scenario is echo +$ make wsdirector + +10 clients, 0 failures + +# Run broadcast scenario +$ SCENARIO=broadcast make wsdirector + +Group publisher: 10 clients, 0 failures +Group listener: 20 clients, 0 failures + +# You can specify the scale factor, too (default: 10) +$ SCALE=20 SCENARIO=broadcast make wsdirector + +Group publisher: 20 clients, 0 failures +Group listener: 40 clients, 0 failures + +``` + +[the-pr]: https://github.com/rails/rails/pull/50979 +[wsdirector]: https://github.com/palkan/wsdirector diff --git a/bento b/bento new file mode 100755 index 0000000..ec5f95f --- /dev/null +++ b/bento @@ -0,0 +1,31 @@ +#!/usr/bin/env ruby + +require "bundler/setup" + +$stdout.puts "YJIT is enabled 🎉" if RUBY_DESCRIPTION =~ /\+YJIT/ + +require "optparse" + +$benchmark_server = :puma + +OptionParser.new do |opts| + opts.banner = "Usage: bento [options]" + + opts.on('--puma', 'Run Rails via Puma server') do |v| + $benchmark_server = :puma + end + + opts.on('--anycable', 'Run AnyCable RPC server') do |v| + $benchmark_server = :anycable + end + + opts.on('--falcon', 'Run Falcon/Async server') do |v| + $benchmark_server = :falcon + end +end.parse! + +require_relative "lib/application" + +$stdout.puts "⚡️ Running Action Cable via #{$benchmark_server}" + +BenchmarkServer.run! diff --git a/bin/anycable-go b/bin/anycable-go new file mode 100755 index 0000000..91efe9d --- /dev/null +++ b/bin/anycable-go @@ -0,0 +1,22 @@ +#!/bin/bash + +cd $(dirname $0)/.. + +# It's recommended to use the exact version of AnyCable-Go here +version="1.5.3" + +if [ ! -f ./bin/dist/anycable-go ]; then + echo "AnyCable server is not installed, downloading..." + ./bin/rails g anycable:download --version=$version --bin-path=$(pwd)/bin/dist +fi + +curVersion=$(./bin/dist/anycable-go -v) + +if [[ "$version" != "latest" ]]; then + if [[ "$curVersion" != "$version"* ]]; then + echo "AnyCable server version is not $version, downloading a new one..." + ./bin/rails g anycable:download --version=$version --bin-path=$(pwd)/bin/dist + fi +fi + +./bin/dist/anycable-go $@ diff --git a/bin/rails b/bin/rails new file mode 100755 index 0000000..6f92e35 --- /dev/null +++ b/bin/rails @@ -0,0 +1,11 @@ +#!/usr/bin/env ruby + +require "bundler/setup" + +APP_PATH = File.expand_path("../lib/application", __dir__) + +require_relative APP_PATH + +Dir.chdir(File.dirname(APP_PATH)) + +require "rails/commands" diff --git a/bin/wait_tcp b/bin/wait_tcp new file mode 100755 index 0000000..297c891 --- /dev/null +++ b/bin/wait_tcp @@ -0,0 +1,24 @@ +#!/usr/bin/env ruby + +require "socket" + +host = "127.0.0.1" +port = ARGV[0] +timeout = ARGV[1]&.to_f || 5.0 # seconds + +$stdout.puts "Waiting for TCP server to start at #{port}" + +while timeout > 0 + begin + Socket.tcp(host, port, connect_timeout: 1).close + $stdout.puts "TCP server is listening at #{port}" + return + rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH, SocketError + end + + sleep 0.5 + timeout -= 0.5 +end + +$stderr.puts "No server is listening at #{port}" +exit(1) diff --git a/lib/application.rb b/lib/application.rb new file mode 100644 index 0000000..5b9a2a7 --- /dev/null +++ b/lib/application.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require "rails" +require "global_id" + +require "action_controller/railtie" +require "action_view/railtie" +require "action_cable/engine" + +require "debug" unless ENV["CI"] + +# config/application.rb +class App < Rails::Application + config.root = __dir__ + config.eager_load = false + config.consider_all_requests_local = true + config.action_dispatch.show_exceptions = false + config.secret_key_base = "i_am_a_secret" + + config.hosts = [] + + config.logger = ActiveSupport::Logger.new((ENV["LOG"] == "1") ? $stdout : IO::NULL) + config.log_level = (ENV["LOG"] == "1") ? :debug : :fatal + + routes.append do + # Add routes here if needed + end +end + +ActionCable.server.config.connection_class = -> { ApplicationCable::Connection } +ActionCable.server.config.disable_request_forgery_protection = true +ActionCable.server.config.logger = Rails.logger + +# Load server configuration +require_relative "servers/#{$benchmark_server}" if defined?($benchmark_server) + +Rails.application.initialize! + +module ApplicationCable + class Connection < ActionCable::Connection::Base + identified_by :uid + end + + class Channel < ActionCable::Channel::Base + end +end + +class BenchmarkChannel < ApplicationCable::Channel + def subscribed + stream_from "all#{stream_id}" + end + + def echo(data) + transmit data + end + + def broadcast(data) + ActionCable.server.broadcast "all#{stream_id}", data + data["action"] = "broadcastResult" + transmit data + end + + private + + def stream_id + params[:id] || "" + end +end diff --git a/lib/servers/anycable.rb b/lib/servers/anycable.rb new file mode 100644 index 0000000..dc9d591 --- /dev/null +++ b/lib/servers/anycable.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true + +require "redis" +require "anycable-rails" + +ActionCable.server.config.cable = { + "adapter" => $benchmark_server == :anycable ? "any_cable" : "redis", + "url" => ENV["REDIS_URL"] +} + +class BenchmarkServer + def self.run! + require "anycable/cli" + cli = AnyCable::CLI.new + # We're already within the app context + cli.define_singleton_method(:boot_app!) { } + + anycable_server_path = Rails.root.join("../bin/anycable-go") + cli.run(["--server-command", "#{anycable_server_path} --host 0.0.0.0"]) + end +end diff --git a/lib/servers/falcon.rb b/lib/servers/falcon.rb new file mode 100644 index 0000000..b06c507 --- /dev/null +++ b/lib/servers/falcon.rb @@ -0,0 +1,127 @@ +# frozen_string_literal: true + +# TODO: This is an old version from here: https://github.com/anycable/anycable/blob/a0c48aeffe7b57f8abcf49ec244e2129f7424c97/benchmarks/rails/bento#L113 +# Requires upgrade for Action Cable 8 +class AsyncApp + def call(req) + Async::WebSocket::Adapters::HTTP.open(req) do |connection| + env = {url: "/cable"} + + connected = AnyCable.rpc_handler.handle( + :connect, + AnyCable::ConnectionRequest.new(env: env) + ).then do |response| + handle_response(connection, response) + + if response.status != :SUCCESS + connection.close + next false + end + + true + end + + next unless connected + + loop do + msg = connection.read + cmd = Protocol::WebSocket::JSONMessage.wrap(msg)&.to_h + + next unless cmd + + identifier = cmd[:identifier] + command = cmd[:command] + + case command + when "subscribe" + AnyCable.rpc_handler.handle( + :command, + AnyCable::CommandMessage.new( + command:, + identifier:, + connection_identifiers: "{}", + env: + ) + ).then do |response| + handle_response(connection, response, identifier) + end + when "message" + AnyCable.rpc_handler.handle( + :command, + AnyCable::CommandMessage.new( + command:, + identifier:, + connection_identifiers: "{}", + data: cmd[:data], + env: + ) + ).then do |response| + handle_response(connection, response, identifier) + end + end + end + rescue EOFError + end + end + + private + + def handle_response(connection, response, identifier = nil) + response.transmissions&.each do |msg| + connection.write(msg) + end + connection.flush + + # Command response + if identifier + writer = proc do |msg| + msg = {identifier: identifier, message: JSON.parse(msg)}.to_json + connection.write(msg) + connetion.flush + end + + response.streams&.each do |stream| + ActionCable.server.pubsub.subscribe(stream, writer) + end + end + end +end + +class BenchmarkServer + def self.run! + require "async/websocket" + require "async/websocket/adapters/http" + require 'protocol/websocket/json_message' + + require "falcon/command" + require "falcon/command/serve" + + # Patch Action Cable subscriber to be async-aware + require "async/semaphore" + ActionCable::SubscriptionAdapter::SubscriberMap.prepend(Module.new do + def initialize(...) + super + @semaphore = Async::Semaphore.new(1024) + end + + def broadcast(channel, message) + list = @sync.synchronize do + return if !@subscribers.key?(channel) + @subscribers[channel].dup + end + + Async do + list.each do |subscriber| + @semaphore.async do + invoke_callback(subscriber, message) + end + end + end + end + end) + + cmd = Falcon::Command::Serve.new(["-p", "8080", "-b", "tcp://0.0.0.0", "--#{ENV.fetch("FALCON_MODE", "forked")}"]) + cmd.define_singleton_method(:load_app) { AsyncApp.new } + cmd.call + end +end diff --git a/lib/servers/puma.rb b/lib/servers/puma.rb new file mode 100644 index 0000000..daefc52 --- /dev/null +++ b/lib/servers/puma.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +ActionCable.server.config.cable = { + "adapter" => ENV.fetch("ACTION_CABLE_ADAPTER", "redis"), + "url" => ENV["REDIS_URL"] +} + +class BenchmarkServer + def self.run! + require "puma/cli" + cli = Puma::CLI.new(["-w", "#{ENV.fetch("WEB_CONCURRENCY", 4)}", "-t", "5", "-p", "8080", "-b", "tcp://0.0.0.0"]) + cli.instance_variable_get(:@conf).options[:app] = Rails.application + cli.run + end +end diff --git a/scripts/wsdirector/broadcast.yml b/scripts/wsdirector/broadcast.yml new file mode 100644 index 0000000..55af650 --- /dev/null +++ b/scripts/wsdirector/broadcast.yml @@ -0,0 +1,27 @@ +- client: + multiplier: ":scale" + name: "publisher" + protocol: "action_cable" + actions: + - subscribe: + channel: "BenchmarkChannel" + - wait_all + - perform: + channel: "BenchmarkChannel" + action: "broadcast" + data: + text: "hello" + +- client: + name: "listener" + multiplier: ":scale * 2" + protocol: "action_cable" + actions: + - subscribe: + channel: "BenchmarkChannel" + - wait_all + - receive: + multiplier: ":scale" + channel: "BenchmarkChannel" + data: + text: "hello" diff --git a/scripts/wsdirector/echo.yml b/scripts/wsdirector/echo.yml new file mode 100644 index 0000000..a586087 --- /dev/null +++ b/scripts/wsdirector/echo.yml @@ -0,0 +1,15 @@ +- client: + multiplier: ":scale" + protocol: "action_cable" + actions: + - subscribe: + channel: "BenchmarkChannel" + - perform: + channel: "BenchmarkChannel" + action: "echo" + data: + text: "ping" + - receive: + channel: "BenchmarkChannel" + data: + text: "ping"