Skip to content

Commit

Permalink
fix: upgrade websocket-client-simple and improve client interface
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Feb 22, 2024
1 parent 7f70ef2 commit b9e256c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
2 changes: 1 addition & 1 deletion anyt-core.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ Gem::Specification.new do |spec|
spec.add_dependency "rails", ">= 6.0"
spec.add_dependency "anyway_config", ">= 2.2.0"
spec.add_dependency "websocket", "~> 1.2.4"
spec.add_dependency "websocket-client-simple", "~> 0.3.0"
spec.add_dependency "websocket-client-simple", "~> 0.8"
spec.add_dependency "concurrent-ruby", "~> 1.0"
end
46 changes: 39 additions & 7 deletions lib/anyt/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,38 @@ class Client
require "websocket-client-simple"
require "concurrent"

class TimeoutError < StandardError; end
class Error < StandardError; end

class TimeoutError < Error; end

class DisconnectedError < Error
attr_reader :event

def initialize(event)
@event = event
if event
super("WebSocket disconnected (code=#{event.code}, reason=#{event.data})")
else
super("WebSocket disconnected abnormally")
end
end
end

WAIT_WHEN_EXPECTING_EVENT = 5
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5

private attr_reader :logger

attr_reader :url

def initialize(
ignore: [], url: Anyt.config.target_url, qs: "",
cookies: "", headers: {},
protocol: "actioncable-v1-json",
timeout_multiplier: Anyt.config.timeout_multiplier
timeout_multiplier: Anyt.config.timeout_multiplier,
logger: AnyCable.logger
)
@logger = logger
ignore_message_types = @ignore_message_types = ignore
messages = @messages = Queue.new
closed = @closed = Concurrent::Event.new
Expand All @@ -32,8 +53,15 @@ def initialize(

open = Concurrent::Promise.new

@url = url

if !qs.empty?
@url += url.include?("?") ? "&" : "?"
@url += qs
end

@ws = WebSocket::Client::Simple.connect(
url + "?#{qs}",
url,
headers: headers
) do |ws|
ws.on(:error) do |event|
Expand All @@ -54,13 +82,15 @@ def initialize(
ws.on(:message) do |event|
next if event.type == :ping
if event.type == :close
messages << DisconnectedError.new(event)
has_messages.release
closed.set
else
message = JSON.parse(event.data)

next if ignore_message_types.include?(message["type"])

AnyCable.logger.debug "Message received: #{message}"
logger.debug "Message received: #{message}"

messages << message
has_messages.release
Expand All @@ -81,11 +111,13 @@ def initialize(
def receive(timeout: WAIT_WHEN_EXPECTING_EVENT)
timeout *= @timeout_multiplier

raise TimeoutError, "Timed out to receive message" unless
@has_messages.try_acquire(1, timeout)
unless @has_messages.try_acquire(1, timeout)
raise DisconnectedError if closed?
raise TimeoutError, "Timed out to receive message"
end

msg = @messages.pop(true)
raise msg if msg.is_a?(Exception)
raise msg if msg.is_a?(Exception) || msg.is_a?(Error)

msg
end
Expand Down

0 comments on commit b9e256c

Please sign in to comment.