From 18733c2adf8a223bf5b6b41c2052c89da9a84dcf Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 14 Aug 2024 12:02:36 +0200 Subject: [PATCH 1/2] move files and prepare for abstraction --- src/lavinmq/{client => amqp}/channel.cr | 0 src/lavinmq/{client => amqp}/client.cr | 0 src/lavinmq/{client/channel => amqp}/consumer.cr | 0 src/lavinmq/{client/channel => amqp}/stream_consumer.cr | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename src/lavinmq/{client => amqp}/channel.cr (100%) rename src/lavinmq/{client => amqp}/client.cr (100%) rename src/lavinmq/{client/channel => amqp}/consumer.cr (100%) rename src/lavinmq/{client/channel => amqp}/stream_consumer.cr (100%) diff --git a/src/lavinmq/client/channel.cr b/src/lavinmq/amqp/channel.cr similarity index 100% rename from src/lavinmq/client/channel.cr rename to src/lavinmq/amqp/channel.cr diff --git a/src/lavinmq/client/client.cr b/src/lavinmq/amqp/client.cr similarity index 100% rename from src/lavinmq/client/client.cr rename to src/lavinmq/amqp/client.cr diff --git a/src/lavinmq/client/channel/consumer.cr b/src/lavinmq/amqp/consumer.cr similarity index 100% rename from src/lavinmq/client/channel/consumer.cr rename to src/lavinmq/amqp/consumer.cr diff --git a/src/lavinmq/client/channel/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr similarity index 100% rename from src/lavinmq/client/channel/stream_consumer.cr rename to src/lavinmq/amqp/stream_consumer.cr From 81ac4751f28fcdbdd1af871da374fd0e6c87d8f2 Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 7 Aug 2024 11:09:30 +0200 Subject: [PATCH 2/2] abstract client, consumer, channel Co-authored-by: Jon --- .ameba.yml | 6 +- src/lavinmq/amqp.cr | 4 +- src/lavinmq/amqp/channel.cr | 32 +- src/lavinmq/amqp/client.cr | 1469 ++++++++--------- src/lavinmq/amqp/connection_factory.cr | 190 +++ src/lavinmq/amqp/consumer.cr | 422 +++-- src/lavinmq/amqp/stream_consumer.cr | 147 +- src/lavinmq/client/amqp_connection.cr | 186 --- src/lavinmq/client/channel.cr | 10 + src/lavinmq/client/channel/consumer.cr | 11 + src/lavinmq/client/client.cr | 8 + src/lavinmq/client/connection_factory.cr | 6 + src/lavinmq/queue/stream_queue.cr | 8 +- .../queue/stream_queue_message_store.cr | 2 +- src/lavinmq/reporter.cr | 13 +- src/lavinmq/server.cr | 5 +- 16 files changed, 1277 insertions(+), 1242 deletions(-) create mode 100644 src/lavinmq/amqp/connection_factory.cr delete mode 100644 src/lavinmq/client/amqp_connection.cr create mode 100644 src/lavinmq/client/channel.cr create mode 100644 src/lavinmq/client/channel/consumer.cr create mode 100644 src/lavinmq/client/client.cr create mode 100644 src/lavinmq/client/connection_factory.cr diff --git a/.ameba.yml b/.ameba.yml index 28e56c55e..92af60cc1 100644 --- a/.ameba.yml +++ b/.ameba.yml @@ -14,9 +14,9 @@ Naming/BlockParameterName: Lint/NotNil: Description: Identifies usage of `not_nil!` calls Excluded: - - src/lavinmq/client/amqp_connection.cr - - src/lavinmq/client/channel.cr - - src/lavinmq/client/client.cr + - src/lavinmq/amqp/connection_factory.cr + - src/lavinmq/amqp/channel.cr + - src/lavinmq/amqp/client.cr - src/lavinmq/federation/link.cr - src/lavinmq/http/controller/main.cr - src/lavinmq/http/controller/exchanges.cr diff --git a/src/lavinmq/amqp.cr b/src/lavinmq/amqp.cr index 515bbae20..1c7d300ac 100644 --- a/src/lavinmq/amqp.cr +++ b/src/lavinmq/amqp.cr @@ -1,5 +1,7 @@ require "amq-protocol" module LavinMQ - alias AMQP = AMQ::Protocol + module AMQP + include AMQ::Protocol + end end diff --git a/src/lavinmq/amqp/channel.cr b/src/lavinmq/amqp/channel.cr index 2d74c0112..380c20fd6 100644 --- a/src/lavinmq/amqp/channel.cr +++ b/src/lavinmq/amqp/channel.cr @@ -1,16 +1,16 @@ -require "./channel/consumer" -require "./channel/stream_consumer" -require "../logger" +require "./client" +require "./consumer" +require "./stream_consumer" +require "../error" require "../queue" require "../exchange" require "../amqp" require "../stats" require "../sortable_json" -require "../error" module LavinMQ - class Client - class Channel + module AMQP + class Channel < LavinMQ::Client::Channel include Stats include SortableJSON @@ -39,7 +39,7 @@ module LavinMQ rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"}) - Log = ::Log.for("channel") + Log = ::Log.for "AMQP.channel" def initialize(@client : Client, @id : UInt16) @metadata = ::Log::Metadata.new(nil, {client: @client.remote_address.to_s, channel: @id.to_i}) @@ -106,7 +106,7 @@ module LavinMQ @client.send_precondition_failed(frame, "Server low on disk space") return end - raise Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name + raise LavinMQ::Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name if ex = @client.vhost.exchanges[frame.exchange]? if !ex.internal? @next_publish_exchange_name = frame.exchange @@ -126,9 +126,9 @@ module LavinMQ end def next_msg_headers(frame) - raise Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name.nil? - raise Error::UnexpectedFrame.new(frame) if @next_msg_props - raise Error::UnexpectedFrame.new(frame) if frame.class_id != 60 + raise LavinMQ::Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name.nil? + raise LavinMQ::Error::UnexpectedFrame.new(frame) if @next_msg_props + raise LavinMQ::Error::UnexpectedFrame.new(frame) if frame.class_id != 60 valid_expiration?(frame) || return if direct_reply_to?(frame.properties.reply_to) if drc = @direct_reply_consumer @@ -154,7 +154,7 @@ module LavinMQ def add_content(frame) if @next_publish_exchange_name.nil? || @next_msg_props.nil? frame.body.skip(frame.body_size) - raise Error::UnexpectedFrame.new(frame) + raise LavinMQ::Error::UnexpectedFrame.new(frame) end if @tx # in transaction mode, copy all bodies to the tmp file serially copied = IO.copy(frame.body, next_msg_body_file, frame.body_size) @@ -248,7 +248,7 @@ module LavinMQ confirm do ok = @client.vhost.publish msg, @next_publish_immediate, @visited, @found_queues basic_return(msg, @next_publish_mandatory, @next_publish_immediate) unless ok - rescue e : Error::PreconditionFailed + rescue e : LavinMQ::Error::PreconditionFailed msg.body_io.skip(msg.bodysize) send AMQP::Frame::Channel::Close.new(@id, 406_u16, "PRECONDITION_FAILED - #{e.message}", 60_u16, 40_u16) end @@ -261,7 +261,7 @@ module LavinMQ if user_id && user_id != current_user.name && !current_user.can_impersonate? text = "Message's user_id property '#{user_id}' doesn't match actual user '#{current_user.name}'" @log.error { text } - raise Error::PreconditionFailed.new(text) + raise LavinMQ::Error::PreconditionFailed.new(text) end end @@ -359,9 +359,9 @@ module LavinMQ return end c = if q.is_a? StreamQueue - StreamConsumer.new(self, q, frame) + AMQP::StreamConsumer.new(self, q, frame) else - Consumer.new(self, q, frame) + AMQP::Consumer.new(self, q, frame) end @consumers.push(c) q.add_consumer(c) diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr index e6eb1db18..08c948256 100644 --- a/src/lavinmq/amqp/client.cr +++ b/src/lavinmq/amqp/client.cr @@ -1,297 +1,300 @@ require "openssl" require "socket" -require "../logger" -require "../vhost" -require "../message" require "./channel" -require "../user" -require "../stats" -require "../sortable_json" -require "../rough_time" +require "../client" require "../error" -require "./amqp_connection" -require "../config" -require "../http/handler/websocket" module LavinMQ - class Client - include Stats - include SortableJSON - - getter vhost, channels, log, name - getter user - getter max_frame_size : UInt32 - getter channel_max : UInt16 - getter heartbeat_timeout : UInt16 - getter auth_mechanism : String - getter client_properties : AMQP::Table - getter remote_address : Socket::IPAddress - - @connected_at = RoughTime.unix_ms - @channels = Hash(UInt16, Client::Channel).new - @exclusive_queues = Array(Queue).new - @heartbeat_interval_ms : Int64? - @local_address : Socket::IPAddress - @running = true - @last_recv_frame = RoughTime.monotonic - @last_sent_frame = RoughTime.monotonic - rate_stats({"send_oct", "recv_oct"}) - DEFAULT_EX = "amq.default" - Log = ::Log.for("client") - - def initialize(@socket : IO, - @connection_info : ConnectionInfo, - @vhost : VHost, - @user : User, - tune_ok, - start_ok) - @remote_address = @connection_info.src - @local_address = @connection_info.dst - - @max_frame_size = tune_ok.frame_max - @channel_max = tune_ok.channel_max - @heartbeat_timeout = tune_ok.heartbeat - @heartbeat_interval_ms = tune_ok.heartbeat.zero? ? nil : ((tune_ok.heartbeat / 2) * 1000).to_i64 - @auth_mechanism = start_ok.mechanism - @name = "#{@remote_address} -> #{@local_address}" - @client_properties = start_ok.client_properties - connection_name = @client_properties["connection_name"]?.try(&.as?(String)) - @metadata = - if connection_name - ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s, name: connection_name}) - else - ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s}) - end - @log = Logger.new(Log, @metadata) - @vhost.add_connection(self) - @log.info { "Connection established for user=#{@user.name}" } - spawn read_loop, name: "Client#read_loop #{@remote_address}" - end - - # Returns client provided connection name if set, else server generated name - def client_name - @client_properties["connection_name"]?.try(&.as(String)) || @name - end - - def channel_name_prefix - @remote_address.to_s - end - - def details_tuple - { - channels: @channels.size, - connected_at: @connected_at, - type: "network", - channel_max: @channel_max, - frame_max: @max_frame_size, - timeout: @heartbeat_timeout, - client_properties: @client_properties, - vhost: @vhost.name, - user: @user.name, - protocol: "AMQP 0-9-1", - auth_mechanism: @auth_mechanism, - host: @local_address.address, - port: @local_address.port, - peer_host: @remote_address.address, - peer_port: @remote_address.port, - name: @name, - pid: @name, - ssl: @connection_info.ssl?, - tls_version: @connection_info.ssl_version, - cipher: @connection_info.ssl_cipher, - state: state, - }.merge(stats_details) - end - - private def read_loop - i = 0 - socket = @socket - loop do - AMQP::Frame.from_io(socket) do |frame| - {% unless flag?(:release) %} - @log.trace { "Received #{frame.inspect}" } - {% end %} - if (i += 1) == 8192 - i = 0 - Fiber.yield - end - frame_size_ok?(frame) || return - case frame - when AMQP::Frame::Connection::Close - @log.info { "Client disconnected: #{frame.reply_text}" } unless frame.reply_text.empty? - send AMQP::Frame::Connection::CloseOk.new - @running = false - next - when AMQP::Frame::Connection::CloseOk - @log.debug { "Confirmed disconnect" } - @running = false - return - end - if @running - process_frame(frame) + module AMQP + class Client < LavinMQ::Client + include Stats + include SortableJSON + + getter vhost, channels, log, name + getter user + getter max_frame_size : UInt32 + getter channel_max : UInt16 + getter heartbeat_timeout : UInt16 + getter auth_mechanism : String + getter client_properties : AMQP::Table + getter remote_address : Socket::IPAddress + + @connected_at = RoughTime.unix_ms + @channels = Hash(UInt16, Client::Channel).new + @exclusive_queues = Array(Queue).new + @heartbeat_interval_ms : Int64? + @local_address : Socket::IPAddress + @running = true + @last_recv_frame = RoughTime.monotonic + @last_sent_frame = RoughTime.monotonic + rate_stats({"send_oct", "recv_oct"}) + DEFAULT_EX = "amq.default" + Log = ::Log.for "AMQP.client" + + def initialize(@socket : IO, + @connection_info : ConnectionInfo, + @vhost : VHost, + @user : User, + tune_ok, + start_ok) + @remote_address = @connection_info.src + @local_address = @connection_info.dst + + @max_frame_size = tune_ok.frame_max + @channel_max = tune_ok.channel_max + @heartbeat_timeout = tune_ok.heartbeat + @heartbeat_interval_ms = tune_ok.heartbeat.zero? ? nil : ((tune_ok.heartbeat / 2) * 1000).to_i64 + @auth_mechanism = start_ok.mechanism + @name = "#{@remote_address} -> #{@local_address}" + @client_properties = start_ok.client_properties + connection_name = @client_properties["connection_name"]?.try(&.as?(String)) + @metadata = + if connection_name + ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s, name: connection_name}) else + ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s}) + end + @log = Logger.new(Log, @metadata) + @vhost.add_connection(self) + @log.info { "Connection established for user=#{@user.name}" } + spawn read_loop, name: "Client#read_loop #{@remote_address}" + end + + # Returns client provided connection name if set, else server generated name + def client_name + @client_properties["connection_name"]?.try(&.as(String)) || @name + end + + def channel_name_prefix + @remote_address.to_s + end + + def details_tuple + { + channels: @channels.size, + connected_at: @connected_at, + type: "network", + channel_max: @channel_max, + frame_max: @max_frame_size, + timeout: @heartbeat_timeout, + client_properties: @client_properties, + vhost: @vhost.name, + user: @user.name, + protocol: "AMQP 0-9-1", + auth_mechanism: @auth_mechanism, + host: @local_address.address, + port: @local_address.port, + peer_host: @remote_address.address, + peer_port: @remote_address.port, + name: @name, + pid: @name, + ssl: @connection_info.ssl?, + tls_version: @connection_info.ssl_version, + cipher: @connection_info.ssl_cipher, + state: state, + }.merge(stats_details) + end + + private def read_loop + i = 0 + socket = @socket + loop do + AMQP::Frame.from_io(socket) do |frame| + {% unless flag?(:release) %} + @log.trace { "Received #{frame.inspect}" } + {% end %} + if (i += 1) == 8192 + i = 0 + Fiber.yield + end + frame_size_ok?(frame) || return case frame - when AMQP::Frame::Body - @log.debug { "Skipping body, waiting for CloseOk" } - frame.body.skip(frame.body_size) + when AMQP::Frame::Connection::Close + @log.info { "Client disconnected: #{frame.reply_text}" } unless frame.reply_text.empty? + send AMQP::Frame::Connection::CloseOk.new + @running = false + next + when AMQP::Frame::Connection::CloseOk + @log.debug { "Confirmed disconnect" } + @running = false + return + end + if @running + process_frame(frame) else - @log.debug { "Discarding #{frame.class.name}, waiting for CloseOk" } + case frame + when AMQP::Frame::Body + @log.debug { "Skipping body, waiting for CloseOk" } + frame.body.skip(frame.body_size) + else + @log.debug { "Discarding #{frame.class.name}, waiting for CloseOk" } + end end + rescue e : LavinMQ::Error::PreconditionFailed + send_precondition_failed(frame, e.message) end - rescue e : Error::PreconditionFailed - send_precondition_failed(frame, e.message) + rescue IO::TimeoutError + send_heartbeat || break + rescue ex : AMQP::Error::NotImplemented + @log.error { ex.inspect } + send_not_implemented(ex) + rescue ex : AMQP::Error::FrameDecode + @log.error { ex.inspect_with_backtrace } + send_frame_error(ex.message) + rescue ex : IO::Error | OpenSSL::SSL::Error + @log.debug { "Lost connection, while reading (#{ex.inspect})" } unless closed? + break + rescue ex : Exception + @log.error { "Unexpected error, while reading: #{ex.inspect_with_backtrace}" } + send_internal_error(ex.message) end - rescue IO::TimeoutError - send_heartbeat || break - rescue ex : AMQP::Error::NotImplemented - @log.error { ex.inspect } - send_not_implemented(ex) - rescue ex : AMQP::Error::FrameDecode - @log.error { ex.inspect_with_backtrace } - send_frame_error(ex.message) - rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while reading (#{ex.inspect})" } unless closed? - break - rescue ex : Exception - @log.error { "Unexpected error, while reading: #{ex.inspect_with_backtrace}" } - send_internal_error(ex.message) - end - ensure - cleanup - close_socket - @log.info { "Connection disconnected for user=#{@user.name}" } - end - - private def frame_size_ok?(frame) : Bool - if frame.bytesize > @max_frame_size - send_frame_error("frame size #{frame.bytesize} exceeded max #{@max_frame_size} bytes") - return false + ensure + cleanup + close_socket + @log.info { "Connection disconnected for user=#{@user.name}" } end - true - end - private def send_heartbeat - now = RoughTime.monotonic - if @last_recv_frame + (@heartbeat_timeout + 5).seconds < now - @log.info { "Heartbeat timeout (#{@heartbeat_timeout}), last seen frame #{(now - @last_recv_frame).total_seconds} s ago, sent frame #{(now - @last_sent_frame).total_seconds} s ago" } - false - else - send AMQP::Frame::Heartbeat.new + private def frame_size_ok?(frame) : Bool + if frame.bytesize > @max_frame_size + send_frame_error("frame size #{frame.bytesize} exceeded max #{@max_frame_size} bytes") + return false + end + true end - end - def send(frame : AMQP::Frame, channel_is_open : Bool? = nil) : Bool - return false if closed? - if channel_is_open.nil? - channel_is_open = frame.channel.zero? || @channels[frame.channel]?.try &.running? - end - unless channel_is_open - @log.debug { "Channel #{frame.channel} is closed so is not sending #{frame.inspect}" } - return false - end - {% unless flag?(:release) %} - @log.trace { "Send #{frame.inspect}" } - {% end %} - @write_lock.synchronize do - s = @socket - s.write_bytes frame, IO::ByteFormat::NetworkEndian - s.flush + private def send_heartbeat + now = RoughTime.monotonic + if @last_recv_frame + (@heartbeat_timeout + 5).seconds < now + @log.info { "Heartbeat timeout (#{@heartbeat_timeout}), last seen frame #{(now - @last_recv_frame).total_seconds} s ago, sent frame #{(now - @last_sent_frame).total_seconds} s ago" } + false + else + send AMQP::Frame::Heartbeat.new + end end - @last_sent_frame = RoughTime.monotonic - @send_oct_count += 8_u64 + frame.bytesize - if frame.is_a?(AMQP::Frame::Connection::CloseOk) - return false - end - true - rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while sending (#{ex.inspect})" } unless closed? - close_socket - false - rescue ex : IO::TimeoutError - @log.info { "Timeout while sending (#{ex.inspect})" } - close_socket - false - rescue ex - @log.error { "Unexpected error, while sending: #{ex.inspect_with_backtrace}" } - send_internal_error(ex.message) - false - end - - def connection_details - { - peer_host: @remote_address.address, - peer_port: @remote_address.port, - name: @name, - } - end - - @write_lock = Mutex.new(:checked) - def deliver(frame, msg) - return false if closed? - @write_lock.synchronize do - socket = @socket - websocket = socket.is_a? WebSocketIO + def send(frame : AMQP::Frame, channel_is_open : Bool? = nil) : Bool + return false if closed? + if channel_is_open.nil? + channel_is_open = frame.channel.zero? || @channels[frame.channel]?.try &.running? + end + unless channel_is_open + @log.debug { "Channel #{frame.channel} is closed so is not sending #{frame.inspect}" } + return false + end {% unless flag?(:release) %} @log.trace { "Send #{frame.inspect}" } {% end %} - socket.write_bytes frame, ::IO::ByteFormat::NetworkEndian - socket.flush if websocket + @write_lock.synchronize do + s = @socket + s.write_bytes frame, IO::ByteFormat::NetworkEndian + s.flush + end + @last_sent_frame = RoughTime.monotonic @send_oct_count += 8_u64 + frame.bytesize - header = AMQP::Frame::Header.new(frame.channel, 60_u16, 0_u16, msg.bodysize, msg.properties) - {% unless flag?(:release) %} - @log.trace { "Send #{header.inspect}" } - {% end %} - socket.write_bytes header, ::IO::ByteFormat::NetworkEndian - socket.flush if websocket - @send_oct_count += 8_u64 + header.bytesize - pos = 0 - while pos < msg.bodysize - length = Math.min(msg.bodysize - pos, @max_frame_size - 8).to_u32 + if frame.is_a?(AMQP::Frame::Connection::CloseOk) + return false + end + true + rescue ex : IO::Error | OpenSSL::SSL::Error + @log.debug { "Lost connection, while sending (#{ex.inspect})" } unless closed? + close_socket + false + rescue ex : IO::TimeoutError + @log.info { "Timeout while sending (#{ex.inspect})" } + close_socket + false + rescue ex + @log.error { "Unexpected error, while sending: #{ex.inspect_with_backtrace}" } + send_internal_error(ex.message) + false + end + + def connection_details + { + peer_host: @remote_address.address, + peer_port: @remote_address.port, + name: @name, + } + end + + @write_lock = Mutex.new(:checked) + + def deliver(frame, msg) + return false if closed? + @write_lock.synchronize do + socket = @socket + websocket = socket.is_a? WebSocketIO + {% unless flag?(:release) %} + @log.trace { "Send #{frame.inspect}" } + {% end %} + socket.write_bytes frame, ::IO::ByteFormat::NetworkEndian + socket.flush if websocket + @send_oct_count += 8_u64 + frame.bytesize + header = AMQP::Frame::Header.new(frame.channel, 60_u16, 0_u16, msg.bodysize, msg.properties) {% unless flag?(:release) %} - @log.trace { "Send BodyFrame (pos #{pos}, length #{length})" } + @log.trace { "Send #{header.inspect}" } {% end %} - body = case msg - in BytesMessage - AMQP::Frame::BytesBody.new(frame.channel, length, msg.body[pos, length]) - in Message - AMQP::Frame::Body.new(frame.channel, length, msg.body_io) - end - socket.write_bytes body, ::IO::ByteFormat::NetworkEndian + socket.write_bytes header, ::IO::ByteFormat::NetworkEndian socket.flush if websocket - @send_oct_count += 8_u64 + body.bytesize - pos += length + @send_oct_count += 8_u64 + header.bytesize + pos = 0 + while pos < msg.bodysize + length = Math.min(msg.bodysize - pos, @max_frame_size - 8).to_u32 + {% unless flag?(:release) %} + @log.trace { "Send BodyFrame (pos #{pos}, length #{length})" } + {% end %} + body = case msg + in BytesMessage + AMQP::Frame::BytesBody.new(frame.channel, length, msg.body[pos, length]) + in Message + AMQP::Frame::Body.new(frame.channel, length, msg.body_io) + end + socket.write_bytes body, ::IO::ByteFormat::NetworkEndian + socket.flush if websocket + @send_oct_count += 8_u64 + body.bytesize + pos += length + end + socket.flush unless websocket # Websockets need to send one frame per WS frame + @last_sent_frame = RoughTime.monotonic end - socket.flush unless websocket # Websockets need to send one frame per WS frame - @last_sent_frame = RoughTime.monotonic + true + rescue ex : IO::Error | OpenSSL::SSL::Error + @log.debug { "Lost connection, while sending (#{ex.inspect})" } + close_socket + Fiber.yield + false + rescue ex : AMQ::Protocol::Error::FrameEncode + @log.warn { "Error encoding frame (#{ex.inspect})" } + close_socket + false + rescue ex : IO::TimeoutError + @log.info { "Timeout while sending (#{ex.inspect})" } + close_socket + false + rescue ex + @log.error { "Delivery exception: #{ex.inspect_with_backtrace}" } + raise ex end - true - rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while sending (#{ex.inspect})" } - close_socket - Fiber.yield - false - rescue ex : AMQ::Protocol::Error::FrameEncode - @log.warn { "Error encoding frame (#{ex.inspect})" } - close_socket - false - rescue ex : IO::TimeoutError - @log.info { "Timeout while sending (#{ex.inspect})" } - close_socket - false - rescue ex - @log.error { "Delivery exception: #{ex.inspect_with_backtrace}" } - raise ex - end - def state - !@running ? "closed" : (@vhost.flow? ? "running" : "flow") - end + def state + !@running ? "closed" : (@vhost.flow? ? "running" : "flow") + end - private def with_channel(frame, &) - if ch = @channels[frame.channel]? - if ch.running? - yield ch + private def with_channel(frame, &) + if ch = @channels[frame.channel]? + if ch.running? + yield ch + else + case frame + when AMQP::Frame::Basic::Publish, AMQP::Frame::Header + @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + when AMQP::Frame::Body + @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + frame.body.skip(frame.body_size) + else + @log.trace { "Discarding #{frame.inspect}, waiting for Close(Ok)" } + end + end else case frame when AMQP::Frame::Basic::Publish, AMQP::Frame::Header @@ -300,547 +303,537 @@ module LavinMQ @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } frame.body.skip(frame.body_size) else - @log.trace { "Discarding #{frame.inspect}, waiting for Close(Ok)" } + @log.error { "Channel #{frame.channel} not open while processing #{frame.class.name}" } + close_connection(frame, 504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open") end end - else + end + + private def open_channel(frame) + if @channels.has_key? frame.channel + close_connection(frame, 504_u16, "CHANNEL_ERROR - second 'channel.open' seen") + else + @channels[frame.channel] = AMQP::Channel.new(self, frame.channel) + @vhost.event_tick(EventType::ChannelCreated) + send AMQP::Frame::Channel::OpenOk.new(frame.channel) + end + end + + # ameba:disable Metrics/CyclomaticComplexity + private def process_frame(frame) : Nil + @last_recv_frame = RoughTime.monotonic + @recv_oct_count += 8_u64 + frame.bytesize case frame - when AMQP::Frame::Basic::Publish, AMQP::Frame::Header - @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + when AMQP::Frame::Channel::Open + open_channel(frame) + when AMQP::Frame::Channel::Close + @channels.delete(frame.channel).try &.close + send AMQP::Frame::Channel::CloseOk.new(frame.channel), true + when AMQP::Frame::Channel::CloseOk + @channels.delete(frame.channel).try &.close + when AMQP::Frame::Channel::Flow + with_channel frame, &.flow(frame.active) + when AMQP::Frame::Channel::FlowOk + # noop + when AMQP::Frame::Confirm::Select + with_channel frame, &.confirm_select(frame) + when AMQP::Frame::Exchange::Declare + declare_exchange(frame) + when AMQP::Frame::Exchange::Delete + delete_exchange(frame) + when AMQP::Frame::Exchange::Bind + bind_exchange(frame) + when AMQP::Frame::Exchange::Unbind + unbind_exchange(frame) + when AMQP::Frame::Queue::Declare + declare_queue(frame) + when AMQP::Frame::Queue::Bind + bind_queue(frame) + when AMQP::Frame::Queue::Unbind + unbind_queue(frame) + when AMQP::Frame::Queue::Delete + delete_queue(frame) + when AMQP::Frame::Queue::Purge + purge_queue(frame) + when AMQP::Frame::Basic::Publish + start_publish(frame) + when AMQP::Frame::Header + with_channel frame, &.next_msg_headers(frame) when AMQP::Frame::Body - @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } - frame.body.skip(frame.body_size) + with_channel frame, &.add_content(frame) + when AMQP::Frame::Basic::Consume + consume(frame) + when AMQP::Frame::Basic::Get + basic_get(frame) + when AMQP::Frame::Basic::Ack + with_channel frame, &.basic_ack(frame) + when AMQP::Frame::Basic::Reject + with_channel frame, &.basic_reject(frame) + when AMQP::Frame::Basic::Nack + with_channel frame, &.basic_nack(frame) + when AMQP::Frame::Basic::Cancel + with_channel frame, &.cancel_consumer(frame) + when AMQP::Frame::Basic::Qos + with_channel frame, &.basic_qos(frame) + when AMQP::Frame::Basic::Recover + with_channel frame, &.basic_recover(frame) + when AMQP::Frame::Tx::Select + with_channel frame, &.tx_select(frame) + when AMQP::Frame::Tx::Commit + with_channel frame, &.tx_commit(frame) + when AMQP::Frame::Tx::Rollback + with_channel frame, &.tx_rollback(frame) + when AMQP::Frame::Heartbeat + nil else - @log.error { "Channel #{frame.channel} not open while processing #{frame.class.name}" } - close_connection(frame, 504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open") + send_not_implemented(frame) end + if heartbeat_interval_ms = @heartbeat_interval_ms + if @last_sent_frame + heartbeat_interval_ms.milliseconds < RoughTime.monotonic + send AMQP::Frame::Heartbeat.new + end + end + rescue ex : LavinMQ::Error::UnexpectedFrame + @log.error { ex.inspect } + close_channel(ex.frame, 505_u16, "UNEXPECTED_FRAME - #{ex.frame.class.name}") end - end - private def open_channel(frame) - if @channels.has_key? frame.channel - close_connection(frame, 504_u16, "CHANNEL_ERROR - second 'channel.open' seen") - else - @channels[frame.channel] = Client::Channel.new(self, frame.channel) - @vhost.event_tick(EventType::ChannelCreated) - send AMQP::Frame::Channel::OpenOk.new(frame.channel) + private def cleanup + @running = false + @channels.each_value &.close + @channels.clear + @exclusive_queues.each(&.close) + @exclusive_queues.clear + @vhost.rm_connection(self) end - end - # ameba:disable Metrics/CyclomaticComplexity - private def process_frame(frame) : Nil - @last_recv_frame = RoughTime.monotonic - @recv_oct_count += 8_u64 + frame.bytesize - case frame - when AMQP::Frame::Channel::Open - open_channel(frame) - when AMQP::Frame::Channel::Close - @channels.delete(frame.channel).try &.close - send AMQP::Frame::Channel::CloseOk.new(frame.channel), true - when AMQP::Frame::Channel::CloseOk - @channels.delete(frame.channel).try &.close - when AMQP::Frame::Channel::Flow - with_channel frame, &.flow(frame.active) - when AMQP::Frame::Channel::FlowOk - # noop - when AMQP::Frame::Confirm::Select - with_channel frame, &.confirm_select(frame) - when AMQP::Frame::Exchange::Declare - declare_exchange(frame) - when AMQP::Frame::Exchange::Delete - delete_exchange(frame) - when AMQP::Frame::Exchange::Bind - bind_exchange(frame) - when AMQP::Frame::Exchange::Unbind - unbind_exchange(frame) - when AMQP::Frame::Queue::Declare - declare_queue(frame) - when AMQP::Frame::Queue::Bind - bind_queue(frame) - when AMQP::Frame::Queue::Unbind - unbind_queue(frame) - when AMQP::Frame::Queue::Delete - delete_queue(frame) - when AMQP::Frame::Queue::Purge - purge_queue(frame) - when AMQP::Frame::Basic::Publish - start_publish(frame) - when AMQP::Frame::Header - with_channel frame, &.next_msg_headers(frame) - when AMQP::Frame::Body - with_channel frame, &.add_content(frame) - when AMQP::Frame::Basic::Consume - consume(frame) - when AMQP::Frame::Basic::Get - basic_get(frame) - when AMQP::Frame::Basic::Ack - with_channel frame, &.basic_ack(frame) - when AMQP::Frame::Basic::Reject - with_channel frame, &.basic_reject(frame) - when AMQP::Frame::Basic::Nack - with_channel frame, &.basic_nack(frame) - when AMQP::Frame::Basic::Cancel - with_channel frame, &.cancel_consumer(frame) - when AMQP::Frame::Basic::Qos - with_channel frame, &.basic_qos(frame) - when AMQP::Frame::Basic::Recover - with_channel frame, &.basic_recover(frame) - when AMQP::Frame::Tx::Select - with_channel frame, &.tx_select(frame) - when AMQP::Frame::Tx::Commit - with_channel frame, &.tx_commit(frame) - when AMQP::Frame::Tx::Rollback - with_channel frame, &.tx_rollback(frame) - when AMQP::Frame::Heartbeat - nil - else - send_not_implemented(frame) - end - if heartbeat_interval_ms = @heartbeat_interval_ms - if @last_sent_frame + heartbeat_interval_ms.milliseconds < RoughTime.monotonic - send AMQP::Frame::Heartbeat.new - end + private def close_socket + @running = false + @socket.close + @log.debug { "Socket closed" } + rescue ex + @log.debug { "#{ex.inspect} when closing socket" } end - rescue ex : Error::UnexpectedFrame - @log.error { ex.inspect } - close_channel(ex.frame, 505_u16, "UNEXPECTED_FRAME - #{ex.frame.class.name}") - end - - private def cleanup - @running = false - @channels.each_value &.close - @channels.clear - @exclusive_queues.each(&.close) - @exclusive_queues.clear - @vhost.rm_connection(self) - end - private def close_socket - @running = false - @socket.close - @log.debug { "Socket closed" } - rescue ex - @log.debug { "#{ex.inspect} when closing socket" } - end + def close(reason = nil) + reason ||= "Connection closed" + @log.info { "Closing, #{reason}" } + send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16) + @running = false + end - def close(reason = nil) - reason ||= "Connection closed" - @log.info { "Closing, #{reason}" } - send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16) - @running = false - end + def force_close + close_socket + end - def force_close - close_socket - end + def closed? + !@running + end - def closed? - !@running - end + def close_channel(frame : AMQ::Protocol::Frame, code, text) + return close_connection(frame, code, text) if frame.channel.zero? + case frame + when AMQ::Protocol::Frame::Method + send AMQP::Frame::Channel::Close.new(frame.channel, code, text, frame.class_id, frame.method_id) + else + send AMQP::Frame::Channel::Close.new(frame.channel, code, text, 0, 0) + end + @channels.delete(frame.channel).try &.close + end - def close_channel(frame : AMQ::Protocol::Frame, code, text) - return close_connection(frame, code, text) if frame.channel.zero? - case frame - when AMQ::Protocol::Frame::Method - send AMQP::Frame::Channel::Close.new(frame.channel, code, text, frame.class_id, frame.method_id) - else - send AMQP::Frame::Channel::Close.new(frame.channel, code, text, 0, 0) + def close_connection(frame : AMQ::Protocol::Frame?, code, text) + @log.info { "Closing, #{text}" } + case frame + when AMQ::Protocol::Frame::Method + send AMQP::Frame::Connection::Close.new(code, text, frame.class_id, frame.method_id) + else + send AMQP::Frame::Connection::Close.new(code, text, 0_u16, 0_u16) + end + @log.info { "Connection=#{@name} disconnected" } + ensure + @running = false end - @channels.delete(frame.channel).try &.close - end - def close_connection(frame : AMQ::Protocol::Frame?, code, text) - @log.info { "Closing, #{text}" } - case frame - when AMQ::Protocol::Frame::Method - send AMQP::Frame::Connection::Close.new(code, text, frame.class_id, frame.method_id) - else - send AMQP::Frame::Connection::Close.new(code, text, 0_u16, 0_u16) - end - @log.info { "Connection=#{@name} disconnected" } - ensure - @running = false - end + def send_access_refused(frame, text) + @log.warn { "Access refused channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 403_u16, "ACCESS_REFUSED - #{text}") + end - def send_access_refused(frame, text) - @log.warn { "Access refused channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 403_u16, "ACCESS_REFUSED - #{text}") - end + def send_not_found(frame, text = "") + @log.warn { "Not found channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 404_u16, "NOT_FOUND - #{text}") + end - def send_not_found(frame, text = "") - @log.warn { "Not found channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 404_u16, "NOT_FOUND - #{text}") - end + def send_resource_locked(frame, text) + @log.warn { "Resource locked channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 405_u16, "RESOURCE_LOCKED - #{text}") + end - def send_resource_locked(frame, text) - @log.warn { "Resource locked channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 405_u16, "RESOURCE_LOCKED - #{text}") - end + def send_precondition_failed(frame, text) + @log.warn { "Precondition failed channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 406_u16, "PRECONDITION_FAILED - #{text}") + end - def send_precondition_failed(frame, text) - @log.warn { "Precondition failed channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 406_u16, "PRECONDITION_FAILED - #{text}") - end + def send_not_implemented(frame, text = nil) + @log.error { "#{frame.inspect}, not implemented reason=\"#{text}\"" } + close_channel(frame, 540_u16, "NOT_IMPLEMENTED - #{text}") + end - def send_not_implemented(frame, text = nil) - @log.error { "#{frame.inspect}, not implemented reason=\"#{text}\"" } - close_channel(frame, 540_u16, "NOT_IMPLEMENTED - #{text}") - end + def send_not_implemented(ex : AMQ::Protocol::Error::NotImplemented) + text = "NOT_IMPLEMENTED" + if ex.channel.zero? + send AMQP::Frame::Connection::Close.new(540, text, ex.class_id, ex.method_id) + @running = false + else + send AMQP::Frame::Channel::Close.new(ex.channel, 540, text, ex.class_id, ex.method_id) + @channels.delete(ex.channel).try &.close + end + end - def send_not_implemented(ex : AMQ::Protocol::Error::NotImplemented) - text = "NOT_IMPLEMENTED" - if ex.channel.zero? - send AMQP::Frame::Connection::Close.new(540, text, ex.class_id, ex.method_id) - @running = false - else - send AMQP::Frame::Channel::Close.new(ex.channel, 540, text, ex.class_id, ex.method_id) - @channels.delete(ex.channel).try &.close + def send_internal_error(message) + close_connection(nil, 541_u16, "INTERNAL_ERROR - Unexpected error, please report") end - end - def send_internal_error(message) - close_connection(nil, 541_u16, "INTERNAL_ERROR - Unexpected error, please report") - end + def send_frame_error(message = nil) + close_connection(nil, 501_u16, "FRAME_ERROR - #{message}") + end - def send_frame_error(message = nil) - close_connection(nil, 501_u16, "FRAME_ERROR - #{message}") - end + private def declare_exchange(frame) + if !valid_entity_name(frame.exchange_name) + send_precondition_failed(frame, "Exchange name isn't valid") + elsif frame.exchange_name.empty? + send_access_refused(frame, "Not allowed to declare the default exchange") + elsif e = @vhost.exchanges.fetch(frame.exchange_name, nil) + redeclare_exchange(e, frame) + elsif frame.passive + send_not_found(frame, "Exchange '#{frame.exchange_name}' doesn't exists") + elsif frame.exchange_name.starts_with? "amq." + send_access_refused(frame, "Not allowed to use the amq. prefix") + else + ae = frame.arguments["x-alternate-exchange"]?.try &.as?(String) + ae_ok = ae.nil? || (@user.can_write?(@vhost.name, ae) && @user.can_read?(@vhost.name, frame.exchange_name)) + unless @user.can_config?(@vhost.name, frame.exchange_name) && ae_ok + send_access_refused(frame, "User doesn't have permissions to declare exchange '#{frame.exchange_name}'") + return + end + begin + @vhost.apply(frame) + rescue e : LavinMQ::Error::ExchangeTypeError + send_precondition_failed(frame, e.message) + end + send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) unless frame.no_wait + end + end - private def declare_exchange(frame) - if !valid_entity_name(frame.exchange_name) - send_precondition_failed(frame, "Exchange name isn't valid") - elsif frame.exchange_name.empty? - send_access_refused(frame, "Not allowed to declare the default exchange") - elsif e = @vhost.exchanges.fetch(frame.exchange_name, nil) - redeclare_exchange(e, frame) - elsif frame.passive - send_not_found(frame, "Exchange '#{frame.exchange_name}' doesn't exists") - elsif frame.exchange_name.starts_with? "amq." - send_access_refused(frame, "Not allowed to use the amq. prefix") - else - ae = frame.arguments["x-alternate-exchange"]?.try &.as?(String) - ae_ok = ae.nil? || (@user.can_write?(@vhost.name, ae) && @user.can_read?(@vhost.name, frame.exchange_name)) - unless @user.can_config?(@vhost.name, frame.exchange_name) && ae_ok - send_access_refused(frame, "User doesn't have permissions to declare exchange '#{frame.exchange_name}'") - return + private def redeclare_exchange(e, frame) + if frame.passive || e.match?(frame) + unless frame.no_wait + send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) + end + else + send_precondition_failed(frame, "Existing exchange '#{frame.exchange_name}' declared with other arguments") end - begin + end + + private def delete_exchange(frame) + if !valid_entity_name(frame.exchange_name) + send_precondition_failed(frame, "Exchange name isn't valid") + elsif frame.exchange_name.empty? + send_access_refused(frame, "Not allowed to delete the default exchange") + elsif frame.exchange_name.starts_with? "amq." + send_access_refused(frame, "Not allowed to use the amq. prefix") + elsif !@vhost.exchanges.has_key? frame.exchange_name + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait + elsif !@user.can_config?(@vhost.name, frame.exchange_name) + send_access_refused(frame, "User doesn't have permissions to delete exchange '#{frame.exchange_name}'") + elsif frame.if_unused && @vhost.exchanges[frame.exchange_name].in_use? + send_precondition_failed(frame, "Exchange '#{frame.exchange_name}' in use") + else @vhost.apply(frame) - rescue e : Error::ExchangeTypeError - send_precondition_failed(frame, e.message) + send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait end - send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) unless frame.no_wait end - end - private def redeclare_exchange(e, frame) - if frame.passive || e.match?(frame) - unless frame.no_wait - send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) + # ameba:disable Metrics/CyclomaticComplexity + private def delete_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + end + if !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + return + end + q = @vhost.queues.fetch(frame.queue_name, nil) + if q.nil? + send AMQP::Frame::Queue::DeleteOk.new(frame.channel, 0_u32) unless frame.no_wait + elsif queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Queue '#{q.name}' is exclusive") + elsif frame.if_unused && !q.consumer_count.zero? + send_precondition_failed(frame, "Queue '#{q.name}' in use") + elsif frame.if_empty && !q.message_count.zero? + send_precondition_failed(frame, "Queue '#{q.name}' is not empty") + elsif !@user.can_config?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have permissions to delete queue '#{q.name}'") + else + size = q.message_count + @vhost.apply(frame) + @exclusive_queues.delete(q) if q.exclusive? + send AMQP::Frame::Queue::DeleteOk.new(frame.channel, size) unless frame.no_wait end - else - send_precondition_failed(frame, "Existing exchange '#{frame.exchange_name}' declared with other arguments") end - end - private def delete_exchange(frame) - if !valid_entity_name(frame.exchange_name) - send_precondition_failed(frame, "Exchange name isn't valid") - elsif frame.exchange_name.empty? - send_access_refused(frame, "Not allowed to delete the default exchange") - elsif frame.exchange_name.starts_with? "amq." - send_access_refused(frame, "Not allowed to use the amq. prefix") - elsif !@vhost.exchanges.has_key? frame.exchange_name - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait - elsif !@user.can_config?(@vhost.name, frame.exchange_name) - send_access_refused(frame, "User doesn't have permissions to delete exchange '#{frame.exchange_name}'") - elsif frame.if_unused && @vhost.exchanges[frame.exchange_name].in_use? - send_precondition_failed(frame, "Exchange '#{frame.exchange_name}' in use") - else - @vhost.apply(frame) - send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait + private def valid_entity_name(name) : Bool + return true if name.empty? + name.matches?(/\A[ -~]*\z/) end - end - # ameba:disable Metrics/CyclomaticComplexity - private def delete_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - end - if !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - return - end - q = @vhost.queues.fetch(frame.queue_name, nil) - if q.nil? - send AMQP::Frame::Queue::DeleteOk.new(frame.channel, 0_u32) unless frame.no_wait - elsif queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Queue '#{q.name}' is exclusive") - elsif frame.if_unused && !q.consumer_count.zero? - send_precondition_failed(frame, "Queue '#{q.name}' in use") - elsif frame.if_empty && !q.message_count.zero? - send_precondition_failed(frame, "Queue '#{q.name}' is not empty") - elsif !@user.can_config?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have permissions to delete queue '#{q.name}'") - else - size = q.message_count - @vhost.apply(frame) - @exclusive_queues.delete(q) if q.exclusive? - send AMQP::Frame::Queue::DeleteOk.new(frame.channel, size) unless frame.no_wait + def queue_exclusive_to_other_client?(q) + q.exclusive? && !@exclusive_queues.includes?(q) end - end - - private def valid_entity_name(name) : Bool - return true if name.empty? - name.matches?(/\A[ -~]*\z/) - end - def queue_exclusive_to_other_client?(q) - q.exclusive? && !@exclusive_queues.includes?(q) - end - - private def declare_queue(frame) - if !frame.queue_name.empty? && !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - elsif q = @vhost.queues.fetch(frame.queue_name, nil) - redeclare_queue(frame, q) - elsif {"amq.rabbitmq.reply-to", "amq.direct.reply-to"}.includes? frame.queue_name - unless frame.no_wait - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) - end - elsif frame.queue_name.starts_with?("amq.direct.reply-to.") - consumer_tag = frame.queue_name[20..] - if @vhost.direct_reply_consumers.has_key? consumer_tag - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 1_u32) - else + private def declare_queue(frame) + if !frame.queue_name.empty? && !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + elsif q = @vhost.queues.fetch(frame.queue_name, nil) + redeclare_queue(frame, q) + elsif {"amq.rabbitmq.reply-to", "amq.direct.reply-to"}.includes? frame.queue_name + unless frame.no_wait + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) + end + elsif frame.queue_name.starts_with?("amq.direct.reply-to.") + consumer_tag = frame.queue_name[20..] + if @vhost.direct_reply_consumers.has_key? consumer_tag + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 1_u32) + else + send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") + end + elsif frame.passive send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") + elsif frame.queue_name.starts_with? "amq." + send_access_refused(frame, "Not allowed to use the amq. prefix") + elsif @vhost.max_queues.try { |max| @vhost.queues.size >= max } + send_access_refused(frame, "queue limit in vhost '#{@vhost.name}' (#{@vhost.max_queues}) is reached") + else + declare_new_queue(frame) end - elsif frame.passive - send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") - elsif frame.queue_name.starts_with? "amq." - send_access_refused(frame, "Not allowed to use the amq. prefix") - elsif @vhost.max_queues.try { |max| @vhost.queues.size >= max } - send_access_refused(frame, "queue limit in vhost '#{@vhost.name}' (#{@vhost.max_queues}) is reached") - else - declare_new_queue(frame) end - end - private def redeclare_queue(frame, q) - if queue_exclusive_to_other_client?(q) || invalid_exclusive_redclare?(frame, q) - send_resource_locked(frame, "Exclusive queue") - elsif frame.passive || q.match?(frame) - q.redeclare - unless frame.no_wait - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, q.name, - q.message_count, q.consumer_count) + private def redeclare_queue(frame, q) + if queue_exclusive_to_other_client?(q) || invalid_exclusive_redclare?(frame, q) + send_resource_locked(frame, "Exclusive queue") + elsif frame.passive || q.match?(frame) + q.redeclare + unless frame.no_wait + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, q.name, + q.message_count, q.consumer_count) + end + @last_queue_name = frame.queue_name + elsif frame.exclusive && !q.exclusive? + send_resource_locked(frame, "Not an exclusive queue") + else + send_precondition_failed(frame, "Existing queue '#{q.name}' declared with other arguments") end - @last_queue_name = frame.queue_name - elsif frame.exclusive && !q.exclusive? - send_resource_locked(frame, "Not an exclusive queue") - else - send_precondition_failed(frame, "Existing queue '#{q.name}' declared with other arguments") end - end - private def invalid_exclusive_redclare?(frame, q) - q.exclusive? && !frame.passive && !frame.exclusive - end - - @last_queue_name : String? - - private def declare_new_queue(frame) - unless @vhost.flow? - send_precondition_failed(frame, "Server low on disk space, can not create queue") - end - if frame.queue_name.empty? - frame.queue_name = Queue.generate_name - end - dlx = frame.arguments["x-dead-letter-exchange"]?.try &.as?(String) - dlx_ok = dlx.nil? || (@user.can_write?(@vhost.name, dlx) && @user.can_read?(@vhost.name, name)) - unless @user.can_config?(@vhost.name, frame.queue_name) && dlx_ok - send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue_name}'") - return - end - @vhost.apply(frame) - @last_queue_name = frame.queue_name - if frame.exclusive - @exclusive_queues << @vhost.queues[frame.queue_name] - end - unless frame.no_wait - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) + private def invalid_exclusive_redclare?(frame, q) + q.exclusive? && !frame.passive && !frame.exclusive end - end - private def bind_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - # according to spec if both queue name and routing key is empty, - # then substitute them with the name of the last declared queue - if frame.routing_key.empty? - frame.routing_key = @last_queue_name.not_nil! - end - end - return unless valid_q_bind_unbind?(frame) - - q = @vhost.queues.fetch(frame.queue_name, nil) - if q.nil? - send_not_found frame, "Queue '#{frame.queue_name}' not found" - elsif !@vhost.exchanges.has_key? frame.exchange_name - send_not_found frame, "Exchange '#{frame.exchange_name}' not found" - elsif !@user.can_read?(@vhost.name, frame.exchange_name) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") - elsif !@user.can_write?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - elsif queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Exclusive queue") - else - @vhost.apply(frame) - send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait - end - end + @last_queue_name : String? - private def unbind_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - end - return unless valid_q_bind_unbind?(frame) - - q = @vhost.queues.fetch(frame.queue_name, nil) - if q.nil? - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) - elsif !@vhost.exchanges.has_key? frame.exchange_name - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) - elsif !@user.can_read?(@vhost.name, frame.exchange_name) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") - elsif !@user.can_write?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - elsif queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Exclusive queue") - else + private def declare_new_queue(frame) + unless @vhost.flow? + send_precondition_failed(frame, "Server low on disk space, can not create queue") + end + if frame.queue_name.empty? + frame.queue_name = Queue.generate_name + end + dlx = frame.arguments["x-dead-letter-exchange"]?.try &.as?(String) + dlx_ok = dlx.nil? || (@user.can_write?(@vhost.name, dlx) && @user.can_read?(@vhost.name, name)) + unless @user.can_config?(@vhost.name, frame.queue_name) && dlx_ok + send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue_name}'") + return + end @vhost.apply(frame) - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + @last_queue_name = frame.queue_name + if frame.exclusive + @exclusive_queues << @vhost.queues[frame.queue_name] + end + unless frame.no_wait + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) + end end - end - - private def valid_q_bind_unbind?(frame) : Bool - if !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - return false - elsif !valid_entity_name(frame.exchange_name) - send_precondition_failed(frame, "Exchange name isn't valid") - return false - elsif frame.exchange_name.empty? || frame.exchange_name == DEFAULT_EX - target = frame.is_a?(AMQP::Frame::Queue::Bind) ? "bind to" : "unbind from" - send_access_refused(frame, "Not allowed to #{target} the default exchange") - return false - end - true - end - private def bind_exchange(frame) - source = @vhost.exchanges.fetch(frame.source, nil) - destination = @vhost.exchanges.fetch(frame.destination, nil) - if destination.nil? - send_not_found frame, "Exchange '#{frame.destination}' doesn't exists" - elsif source.nil? - send_not_found frame, "Exchange '#{frame.source}' doesn't exists" - elsif !@user.can_read?(@vhost.name, frame.source) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") - elsif !@user.can_write?(@vhost.name, frame.destination) - send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") - elsif frame.source.empty? || frame.destination.empty? - send_access_refused(frame, "Not allowed to bind to the default exchange") - else - @vhost.apply(frame) - send AMQP::Frame::Exchange::BindOk.new(frame.channel) unless frame.no_wait + private def bind_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + # according to spec if both queue name and routing key is empty, + # then substitute them with the name of the last declared queue + if frame.routing_key.empty? + frame.routing_key = @last_queue_name.not_nil! + end + end + return unless valid_q_bind_unbind?(frame) + + q = @vhost.queues.fetch(frame.queue_name, nil) + if q.nil? + send_not_found frame, "Queue '#{frame.queue_name}' not found" + elsif !@vhost.exchanges.has_key? frame.exchange_name + send_not_found frame, "Exchange '#{frame.exchange_name}' not found" + elsif !@user.can_read?(@vhost.name, frame.exchange_name) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") + elsif !@user.can_write?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") + elsif queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Exclusive queue") + else + @vhost.apply(frame) + send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait + end end - end - private def unbind_exchange(frame) - source = @vhost.exchanges.fetch(frame.source, nil) - destination = @vhost.exchanges.fetch(frame.destination, nil) - if destination.nil? - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) - elsif source.nil? - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) - elsif !@user.can_read?(@vhost.name, frame.source) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") - elsif !@user.can_write?(@vhost.name, frame.destination) - send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") - elsif frame.source.empty? || frame.destination.empty? || frame.source == DEFAULT_EX || frame.destination == DEFAULT_EX - send_access_refused(frame, "Not allowed to unbind from the default exchange") - else - @vhost.apply(frame) - send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) unless frame.no_wait + private def unbind_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + end + return unless valid_q_bind_unbind?(frame) + + q = @vhost.queues.fetch(frame.queue_name, nil) + if q.nil? + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + elsif !@vhost.exchanges.has_key? frame.exchange_name + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + elsif !@user.can_read?(@vhost.name, frame.exchange_name) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") + elsif !@user.can_write?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") + elsif queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Exclusive queue") + else + @vhost.apply(frame) + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + end end - end - private def purge_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - end - unless @user.can_read?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - return - end - if !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - elsif q = @vhost.queues.fetch(frame.queue_name, nil) - if queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Queue '#{q.name}' is exclusive") + private def valid_q_bind_unbind?(frame) : Bool + if !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + return false + elsif !valid_entity_name(frame.exchange_name) + send_precondition_failed(frame, "Exchange name isn't valid") + return false + elsif frame.exchange_name.empty? || frame.exchange_name == DEFAULT_EX + target = frame.is_a?(AMQP::Frame::Queue::Bind) ? "bind to" : "unbind from" + send_access_refused(frame, "Not allowed to #{target} the default exchange") + return false + end + true + end + + private def bind_exchange(frame) + source = @vhost.exchanges.fetch(frame.source, nil) + destination = @vhost.exchanges.fetch(frame.destination, nil) + if destination.nil? + send_not_found frame, "Exchange '#{frame.destination}' doesn't exists" + elsif source.nil? + send_not_found frame, "Exchange '#{frame.source}' doesn't exists" + elsif !@user.can_read?(@vhost.name, frame.source) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") + elsif !@user.can_write?(@vhost.name, frame.destination) + send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") + elsif frame.source.empty? || frame.destination.empty? + send_access_refused(frame, "Not allowed to bind to the default exchange") else - messages_purged = q.purge - send AMQP::Frame::Queue::PurgeOk.new(frame.channel, messages_purged) unless frame.no_wait + @vhost.apply(frame) + send AMQP::Frame::Exchange::BindOk.new(frame.channel) unless frame.no_wait end - else - send_not_found(frame, "Queue '#{frame.queue_name}' not found") end - end - private def start_publish(frame) - unless @user.can_write?(@vhost.name, frame.exchange) - send_access_refused(frame, "User not allowed to publish to exchange '#{frame.exchange}'") - return + private def unbind_exchange(frame) + source = @vhost.exchanges.fetch(frame.source, nil) + destination = @vhost.exchanges.fetch(frame.destination, nil) + if destination.nil? + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) + elsif source.nil? + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) + elsif !@user.can_read?(@vhost.name, frame.source) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") + elsif !@user.can_write?(@vhost.name, frame.destination) + send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") + elsif frame.source.empty? || frame.destination.empty? || frame.source == DEFAULT_EX || frame.destination == DEFAULT_EX + send_access_refused(frame, "Not allowed to unbind from the default exchange") + else + @vhost.apply(frame) + send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) unless frame.no_wait + end end - with_channel frame, &.start_publish(frame) - end - private def consume(frame) - if frame.queue.empty? && @last_queue_name - frame.queue = @last_queue_name.not_nil! - end - if !valid_entity_name(frame.queue) - send_precondition_failed(frame, "Queue name isn't valid") - return - end - unless @user.can_read?(@vhost.name, frame.queue) - send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") - return + private def purge_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + end + unless @user.can_read?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") + return + end + if !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + elsif q = @vhost.queues.fetch(frame.queue_name, nil) + if queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Queue '#{q.name}' is exclusive") + else + messages_purged = q.purge + send AMQP::Frame::Queue::PurgeOk.new(frame.channel, messages_purged) unless frame.no_wait + end + else + send_not_found(frame, "Queue '#{frame.queue_name}' not found") + end end - with_channel frame, &.consume(frame) - end - private def basic_get(frame) - if frame.queue.empty? && @last_queue_name - frame.queue = @last_queue_name.not_nil! + private def start_publish(frame) + unless @user.can_write?(@vhost.name, frame.exchange) + send_access_refused(frame, "User not allowed to publish to exchange '#{frame.exchange}'") + return + end + with_channel frame, &.start_publish(frame) end - if !valid_entity_name(frame.queue) - send_precondition_failed(frame, "Queue name isn't valid") - return + + private def consume(frame) + if frame.queue.empty? && @last_queue_name + frame.queue = @last_queue_name.not_nil! + end + if !valid_entity_name(frame.queue) + send_precondition_failed(frame, "Queue name isn't valid") + return + end + unless @user.can_read?(@vhost.name, frame.queue) + send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") + return + end + with_channel frame, &.consume(frame) end - unless @user.can_read?(@vhost.name, frame.queue) - send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") - return + + private def basic_get(frame) + if frame.queue.empty? && @last_queue_name + frame.queue = @last_queue_name.not_nil! + end + if !valid_entity_name(frame.queue) + send_precondition_failed(frame, "Queue name isn't valid") + return + end + unless @user.can_read?(@vhost.name, frame.queue) + send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") + return + end + # yield so that msg expiration, consumer delivery etc gets priority + Fiber.yield + with_channel frame, &.basic_get(frame) end - # yield so that msg expiration, consumer delivery etc gets priority - Fiber.yield - with_channel frame, &.basic_get(frame) end end end diff --git a/src/lavinmq/amqp/connection_factory.cr b/src/lavinmq/amqp/connection_factory.cr new file mode 100644 index 000000000..a81baa35f --- /dev/null +++ b/src/lavinmq/amqp/connection_factory.cr @@ -0,0 +1,190 @@ +require "log" +require "../version" +require "./client" +require "../client/connection_factory" + +module LavinMQ + module AMQP + class ConnectionFactory < LavinMQ::ConnectionFactory + Log = ::Log.for "AMQP.ConnectionFactory" + + def start(socket, connection_info, vhosts, users) : Client? + remote_address = connection_info.src + socket.read_timeout = 15.seconds + if confirm_header(socket) + if start_ok = start(socket) + if user = authenticate(socket, remote_address, users, start_ok) + if tune_ok = tune(socket) + if vhost = open(socket, vhosts, user) + socket.read_timeout = heartbeat_timeout(tune_ok) + return LavinMQ::AMQP::Client.new(socket, connection_info, vhost, user, tune_ok, start_ok) + end + end + end + end + end + rescue ex : IO::TimeoutError | IO::Error | OpenSSL::SSL::Error | AMQP::Error::FrameDecode + Log.warn { "#{ex} when #{remote_address} tried to establish connection" } + nil + rescue ex + Log.error(exception: ex) { "Error while #{remote_address} tried to establish connection" } + nil + end + + private def heartbeat_timeout(tune_ok) + if tune_ok.heartbeat > 0 + (tune_ok.heartbeat / 2).seconds + end + end + + def confirm_header(socket) : Bool + proto = uninitialized UInt8[8] + count = socket.read(proto.to_slice) + if count.zero? # EOF, socket closed by peer + false + elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 + socket.write AMQP::PROTOCOL_START_0_9_1.to_slice + socket.flush + Log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } + false + else + true + end + end + + SERVER_PROPERTIES = AMQP::Table.new({ + "product": "LavinMQ", + "platform": "Crystal #{Crystal::VERSION}", + "version": LavinMQ::VERSION, + "capabilities": { + "publisher_confirms": true, + "exchange_exchange_bindings": true, + "basic.nack": true, + "consumer_cancel_notify": true, + "connection.blocked": true, + "consumer_priorities": true, + "authentication_failure_close": true, + "per_consumer_qos": true, + "direct_reply_to": true, + }, + }) + + def start(socket) + start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES) + socket.write_bytes start, ::IO::ByteFormat::NetworkEndian + socket.flush + start_ok = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::StartOk) } + if start_ok.bytesize > 4096 + Log.warn { "StartOk frame was #{start_ok.bytesize} bytes, max allowed is 4096 bytes" } + return + end + start_ok + end + + def credentials(start_ok) + case start_ok.mechanism + when "PLAIN" + resp = start_ok.response + if i = resp.index('\u0000', 1) + {resp[1...i], resp[(i + 1)..-1]} + else + raise "Invalid authentication response" + end + when "AMQPLAIN" + io = ::IO::Memory.new(start_ok.response) + tbl = AMQP::Table.from_io(io, ::IO::ByteFormat::NetworkEndian, io.bytesize.to_u32) + {tbl["LOGIN"].as(String), tbl["PASSWORD"].as(String)} + else raise "Unsupported authentication mechanism: #{start_ok.mechanism}" + end + end + + def authenticate(socket, remote_address, users, start_ok) + username, password = credentials(start_ok) + user = users[username]? + return user if user && user.password && user.password.not_nil!.verify(password) && + guest_only_loopback?(remote_address, user) + + if user.nil? + Log.warn { "User \"#{username}\" not found" } + else + Log.warn { "Authentication failure for user \"#{username}\"" } + end + props = start_ok.client_properties + if capabilities = props["capabilities"]?.try &.as?(AMQP::Table) + if capabilities["authentication_failure_close"]?.try &.as?(Bool) + socket.write_bytes AMQP::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED", + start_ok.class_id, + start_ok.method_id), IO::ByteFormat::NetworkEndian + socket.flush + end + end + nil + end + + def tune(socket) + frame_max = socket.is_a?(WebSocketIO) ? 4096_u32 : Config.instance.frame_max + socket.write_bytes AMQP::Frame::Connection::Tune.new( + channel_max: Config.instance.channel_max, + frame_max: frame_max, + heartbeat: Config.instance.heartbeat), IO::ByteFormat::NetworkEndian + socket.flush + tune_ok = AMQP::Frame.from_io(socket) do |frame| + case frame + when AMQP::Frame::Connection::TuneOk + if frame.frame_max < 4096 + Log.warn { "Suggested Frame max (#{frame.frame_max}) too low, closing connection" } + return + end + frame + else + Log.warn { "Expected TuneOk Frame got #{frame.inspect}" } + return + end + end + if tune_ok.frame_max < 4096 + Log.warn { "Suggested Frame max (#{tune_ok.frame_max}) too low, closing connection" } + return + end + tune_ok + end + + def open(socket, vhosts, user) + open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) } + vhost_name = open.vhost.empty? ? "/" : open.vhost + if vhost = vhosts[vhost_name]? + if user.permissions[vhost_name]? + if vhost.max_connections.try { |max| vhost.connections.size >= max } + Log.warn { "Max connections (#{vhost.max_connections}) reached for vhost #{vhost_name}" } + reply_text = "NOT_ALLOWED - access to vhost '#{vhost_name}' refused: connection limit (#{vhost.max_connections}) is reached" + socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, + open.class_id, open.method_id), IO::ByteFormat::NetworkEndian + socket.flush + return + end + socket.write_bytes AMQP::Frame::Connection::OpenOk.new, IO::ByteFormat::NetworkEndian + socket.flush + return vhost + else + Log.warn { "Access denied for user \"#{user.name}\" to vhost \"#{vhost_name}\"" } + reply_text = "NOT_ALLOWED - '#{user.name}' doesn't have access to '#{vhost.name}'" + socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, + open.class_id, open.method_id), IO::ByteFormat::NetworkEndian + socket.flush + end + else + Log.warn { "VHost \"#{vhost_name}\" not found" } + socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, "NOT_ALLOWED - vhost not found", + open.class_id, open.method_id), IO::ByteFormat::NetworkEndian + socket.flush + end + nil + end + + private def guest_only_loopback?(remote_address, user) : Bool + return true unless user.name == "guest" + return true unless Config.instance.guest_only_loopback? + remote_address.loopback? + end + end + end +end diff --git a/src/lavinmq/amqp/consumer.cr b/src/lavinmq/amqp/consumer.cr index 0840a2885..d581afa1c 100644 --- a/src/lavinmq/amqp/consumer.cr +++ b/src/lavinmq/amqp/consumer.cr @@ -1,266 +1,262 @@ require "log" -require "../../sortable_json" -require "../../error" -require "../../logger" +require "../client/channel/consumer" module LavinMQ - class Client - class Channel - class Consumer - include SortableJSON - Log = ::Log.for("consumer") - getter tag : String - getter priority : Int32 - getter? exclusive : Bool - getter? no_ack : Bool - getter channel, queue - getter prefetch_count = 0u16 - getter unacked = 0_u32 - getter? closed = false - @flow : Bool - @metadata : ::Log::Metadata + module AMQP + class Consumer < LavinMQ::Client::Channel::Consumer + include SortableJSON + Log = ::Log.for "AMQP.consumer" + getter tag : String + getter priority : Int32 + getter? exclusive : Bool + getter? no_ack : Bool + getter channel, queue + getter prefetch_count = 0u16 + getter unacked = 0_u32 + getter? closed = false + @flow : Bool + @metadata : ::Log::Metadata - def initialize(@channel : Client::Channel, @queue : Queue, frame : AMQP::Frame::Basic::Consume) - @tag = frame.consumer_tag - @no_ack = frame.no_ack - @exclusive = frame.exclusive - @priority = consumer_priority(frame) # Must be before ConsumeOk, can close channel - @prefetch_count = @channel.prefetch_count - @flow = @channel.flow? - @metadata = @channel.@metadata.extend({consumer: @tag}) - @log = Logger.new(Log, @metadata) - spawn deliver_loop, name: "Consumer deliver loop", same_thread: true - end + def initialize(@channel : AMQP::Channel, @queue : Queue, frame : AMQP::Frame::Basic::Consume) + @tag = frame.consumer_tag + @no_ack = frame.no_ack + @exclusive = frame.exclusive + @priority = consumer_priority(frame) # Must be before ConsumeOk, can close channel + @prefetch_count = @channel.prefetch_count + @flow = @channel.flow? + @metadata = @channel.@metadata.extend({consumer: @tag}) + @log = Logger.new(Log, @metadata) + spawn deliver_loop, name: "Consumer deliver loop", same_thread: true + end - def close - @closed = true - @queue.rm_consumer(self) - @notify_closed.close - @has_capacity.close - @flow_change.close - end + def close + @closed = true + @queue.rm_consumer(self) + @notify_closed.close + @has_capacity.close + @flow_change.close + end - @notify_closed = ::Channel(Nil).new - @flow_change = ::Channel(Bool).new + @notify_closed = ::Channel(Nil).new + @flow_change = ::Channel(Bool).new - def flow(active : Bool) - @flow = active - @flow_change.try_send? active - end + def flow(active : Bool) + @flow = active + @flow_change.try_send? active + end - def prefetch_count=(prefetch_count : UInt16) - @prefetch_count = prefetch_count - notify_has_capacity(@prefetch_count > @unacked) - end + def prefetch_count=(prefetch_count : UInt16) + @prefetch_count = prefetch_count + notify_has_capacity(@prefetch_count > @unacked) + end - private def deliver_loop - wait_for_single_active_consumer - queue = @queue - i = 0 + private def deliver_loop + wait_for_single_active_consumer + queue = @queue + i = 0 + loop do + wait_for_capacity loop do - wait_for_capacity - loop do - raise ClosedError.new if @closed - next if wait_for_global_capacity - next if wait_for_priority_consumers - next if wait_for_queue_ready - next if wait_for_paused_queue - next if wait_for_flow - break - end - {% unless flag?(:release) %} - @log.debug { "Getting a new message" } - {% end %} - queue.consume_get(self) do |env| - deliver(env.message, env.segment_position, env.redelivered) - end - Fiber.yield if (i &+= 1) % 32768 == 0 - end - rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError - @log.debug { "deliver loop exiting: #{ex.inspect}" } - end - - private def wait_for_global_capacity - ch = @channel - return if ch.has_capacity? - @log.debug { "Waiting for global prefetch capacity" } - select - when ch.has_capacity.receive - when @notify_closed.receive + raise ClosedError.new if @closed + next if wait_for_global_capacity + next if wait_for_priority_consumers + next if wait_for_queue_ready + next if wait_for_paused_queue + next if wait_for_flow + break end - true - end - - private def wait_for_single_active_consumer - case @queue.single_active_consumer - when self - @log.debug { "This consumer is the single active consumer" } - when nil - @log.debug { "The queue isn't a single active consumer queue" } - else - @log.debug { "Waiting for this consumer to become the single active consumer" } - loop do - select - when sca = @queue.single_active_consumer_change.receive - if sca == self - break - else - @log.debug { "New single active consumer, but not me" } - end - when @notify_closed.receive - break - end - end - true + {% unless flag?(:release) %} + @log.debug { "Getting a new message" } + {% end %} + queue.consume_get(self) do |env| + deliver(env.message, env.segment_position, env.redelivered) end + Fiber.yield if (i &+= 1) % 32768 == 0 end + rescue ex : ClosedError | Queue::ClosedError | AMQP::Channel::ClosedError | ::Channel::ClosedError + @log.debug { "deliver loop exiting: #{ex.inspect}" } + end - private def wait_for_priority_consumers - # single active consumer queues can't have priority consumers - if @queue.has_priority_consumers? && @queue.single_active_consumer.nil? - if @queue.consumers.any? { |c| c.priority > @priority && c.accepts? } - @log.debug { "Waiting for higher priority consumers to not have capacity" } - begin - ::Channel.receive_first(@queue.consumers.map(&.has_capacity)) - rescue ::Channel::ClosedError - end - return true - end - end + private def wait_for_global_capacity + ch = @channel + return if ch.has_capacity? + @log.debug { "Waiting for global prefetch capacity" } + select + when ch.has_capacity.receive + when @notify_closed.receive end + true + end - private def wait_for_queue_ready - if @queue.empty? - @log.debug { "Waiting for queue not to be empty" } + private def wait_for_single_active_consumer + case @queue.single_active_consumer + when self + @log.debug { "This consumer is the single active consumer" } + when nil + @log.debug { "The queue isn't a single active consumer queue" } + else + @log.debug { "Waiting for this consumer to become the single active consumer" } + loop do select - when is_empty = @queue.empty_change.receive - @log.debug { "Queue is #{is_empty ? "" : "not"} empty" } + when sca = @queue.single_active_consumer_change.receive + if sca == self + break + else + @log.debug { "New single active consumer, but not me" } + end when @notify_closed.receive + break end - return true end + true end + end - private def wait_for_paused_queue - if @queue.paused? - @log.debug { "Waiting for queue not to be paused" } - select - when is_paused = @queue.paused_change.receive - @log.debug { "Queue is #{is_paused ? "" : "not"} paused" } - when @notify_closed.receive + private def wait_for_priority_consumers + # single active consumer queues can't have priority consumers + if @queue.has_priority_consumers? && @queue.single_active_consumer.nil? + if @queue.consumers.any? { |c| c.priority > @priority && c.accepts? } + @log.debug { "Waiting for higher priority consumers to not have capacity" } + begin + ::Channel.receive_first(@queue.consumers.map(&.has_capacity)) + rescue ::Channel::ClosedError end return true end end + end - private def wait_for_flow - unless @flow - @log.debug { "Waiting for flow" } - is_flow = @flow_change.receive - @log.debug { "Channel flow=#{is_flow}" } - return true + private def wait_for_queue_ready + if @queue.empty? + @log.debug { "Waiting for queue not to be empty" } + select + when is_empty = @queue.empty_change.receive + @log.debug { "Queue is #{is_empty ? "" : "not"} empty" } + when @notify_closed.receive end + return true end + end - # blocks until the consumer can accept more messages - private def wait_for_capacity : Nil - if @prefetch_count > 0 - until @unacked < @prefetch_count - @log.debug { "Waiting for prefetch capacity" } - @has_capacity.receive - end + private def wait_for_paused_queue + if @queue.paused? + @log.debug { "Waiting for queue not to be paused" } + select + when is_paused = @queue.paused_change.receive + @log.debug { "Queue is #{is_paused ? "" : "not"} paused" } + when @notify_closed.receive end + return true end + end - def accepts? : Bool - return false unless @flow - return false if @prefetch_count > 0 && @unacked >= @prefetch_count - return false if @channel.global_prefetch_count > 0 && @channel.consumers.sum(&.unacked) >= @channel.global_prefetch_count - true + private def wait_for_flow + unless @flow + @log.debug { "Waiting for flow" } + is_flow = @flow_change.receive + @log.debug { "Channel flow=#{is_flow}" } + return true end + end - getter has_capacity = ::Channel(Bool).new - - private def notify_has_capacity(value) - while @has_capacity.try_send? value + # blocks until the consumer can accept more messages + private def wait_for_capacity : Nil + if @prefetch_count > 0 + until @unacked < @prefetch_count + @log.debug { "Waiting for prefetch capacity" } + @has_capacity.receive end end + end - def deliver(msg, sp, redelivered = false, recover = false) - unless @no_ack || recover - @unacked += 1 - notify_has_capacity(false) if @unacked == @prefetch_count - end - delivery_tag = @channel.next_delivery_tag(@queue, sp, @no_ack, self) - deliver = AMQP::Frame::Basic::Deliver.new(@channel.id, @tag, - delivery_tag, - redelivered, - msg.exchange_name, msg.routing_key) - @channel.deliver(deliver, msg, redelivered) - end + def accepts? : Bool + return false unless @flow + return false if @prefetch_count > 0 && @unacked >= @prefetch_count + return false if @channel.global_prefetch_count > 0 && @channel.consumers.sum(&.unacked) >= @channel.global_prefetch_count + true + end - def ack(sp) - was_full = @unacked == @prefetch_count - @unacked -= 1 - notify_has_capacity(true) if was_full - end + getter has_capacity = ::Channel(Bool).new - def reject(sp, requeue = false) - was_full = @unacked == @prefetch_count - @unacked -= 1 - notify_has_capacity(true) if was_full + private def notify_has_capacity(value) + while @has_capacity.try_send? value end + end - def cancel - @channel.send AMQP::Frame::Basic::Cancel.new(@channel.id, @tag, no_wait: true) - @channel.consumers.delete self - close + def deliver(msg, sp, redelivered = false, recover = false) + unless @no_ack || recover + @unacked += 1 + notify_has_capacity(false) if @unacked == @prefetch_count end + delivery_tag = @channel.next_delivery_tag(@queue, sp, @no_ack, self) + deliver = AMQP::Frame::Basic::Deliver.new(@channel.id, @tag, + delivery_tag, + redelivered, + msg.exchange_name, msg.routing_key) + @channel.deliver(deliver, msg, redelivered) + end - private def consumer_priority(frame) : Int32 - case prio = frame.arguments["x-priority"] - when Int then prio.to_i32 - else raise Error::PreconditionFailed.new("x-priority must be an integer") - end - rescue KeyError - 0 - rescue OverflowError - raise Error::PreconditionFailed.new("x-priority out of bounds, must fit a 32-bit integer") - end + def ack(sp) + was_full = @unacked == @prefetch_count + @unacked -= 1 + notify_has_capacity(true) if was_full + end - def unacked_messages - @channel.unacked - end + def reject(sp, requeue = false) + was_full = @unacked == @prefetch_count + @unacked -= 1 + notify_has_capacity(true) if was_full + end - def channel_name - @channel.name - end + def cancel + @channel.send AMQP::Frame::Basic::Cancel.new(@channel.id, @tag, no_wait: true) + @channel.consumers.delete self + close + end - def details_tuple - channel_details = @channel.details_tuple - { - queue: { - name: @queue.name, - vhost: @queue.vhost.name, - }, - consumer_tag: @tag, - exclusive: @exclusive, - ack_required: !@no_ack, - prefetch_count: @prefetch_count, - priority: @priority, - channel_details: { - peer_host: channel_details[:connection_details][:peer_host]?, - peer_port: channel_details[:connection_details][:peer_port]?, - connection_name: channel_details[:connection_details][:name], - user: channel_details[:user], - number: channel_details[:number], - name: channel_details[:name], - }, - } + private def consumer_priority(frame) : Int32 + case prio = frame.arguments["x-priority"] + when Int then prio.to_i32 + else raise LavinMQ::Error::PreconditionFailed.new("x-priority must be an integer") end + rescue KeyError + 0 + rescue OverflowError + raise LavinMQ::Error::PreconditionFailed.new("x-priority out of bounds, must fit a 32-bit integer") + end + + def unacked_messages + @channel.unacked + end + + def channel_name + @channel.name + end - class ClosedError < Error; end + def details_tuple + channel_details = @channel.details_tuple + { + queue: { + name: @queue.name, + vhost: @queue.vhost.name, + }, + consumer_tag: @tag, + exclusive: @exclusive, + ack_required: !@no_ack, + prefetch_count: @prefetch_count, + priority: @priority, + channel_details: { + peer_host: channel_details[:connection_details][:peer_host]?, + peer_port: channel_details[:connection_details][:peer_port]?, + connection_name: channel_details[:connection_details][:name], + user: channel_details[:user], + number: channel_details[:number], + name: channel_details[:name], + }, + } end + + class ClosedError < Error; end end end end diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index 33a7a04ad..152ce98f2 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -1,93 +1,92 @@ require "./consumer" -require "../../segment_position" +require "../segment_position" module LavinMQ - class Client - class Channel - class StreamConsumer < LavinMQ::Client::Channel::Consumer - property offset : Int64 - property segment : UInt32 - property pos : UInt32 - getter requeued = Deque(SegmentPosition).new + module AMQP + class StreamConsumer < Consumer + include SortableJSON + property offset : Int64 + property segment : UInt32 + property pos : UInt32 + getter requeued = Deque(SegmentPosition).new - def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) - validate_preconditions(frame) - offset = frame.arguments["x-stream-offset"]? - @offset, @segment, @pos = stream_queue.find_offset(offset) - super - end + def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) + validate_preconditions(frame) + offset = frame.arguments["x-stream-offset"]? + @offset, @segment, @pos = stream_queue.find_offset(offset) + super + end - private def validate_preconditions(frame) - if frame.exclusive - raise Error::PreconditionFailed.new("Stream consumers must not be exclusive") - end - if frame.no_ack - raise Error::PreconditionFailed.new("Stream consumers must acknowledge messages") - end - if @channel.prefetch_count.zero? - raise Error::PreconditionFailed.new("Stream consumers must have a prefetch limit") - end - unless @channel.global_prefetch_count.zero? - raise Error::PreconditionFailed.new("Stream consumers does not support global prefetch limit") - end - if frame.arguments.has_key? "x-priority" - raise Error::PreconditionFailed.new("x-priority not supported on stream queues") - end - case frame.arguments["x-stream-offset"]? - when Nil, Int, Time, "first", "next", "last" - else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") - end + private def validate_preconditions(frame) + if frame.exclusive + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers must not be exclusive") + end + if frame.no_ack + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers must acknowledge messages") + end + if @channel.prefetch_count.zero? + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers must have a prefetch limit") end + unless @channel.global_prefetch_count.zero? + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers does not support global prefetch limit") + end + if frame.arguments.has_key? "x-priority" + raise LavinMQ::Error::PreconditionFailed.new("x-priority not supported on stream queues") + end + case frame.arguments["x-stream-offset"]? + when Nil, Int, Time, "first", "next", "last" + else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") + end + end - private def deliver_loop - i = 0 + private def deliver_loop + i = 0 + loop do + wait_for_capacity loop do - wait_for_capacity - loop do - raise ClosedError.new if @closed - next if wait_for_queue_ready - next if wait_for_paused_queue - next if wait_for_flow - break - end - {% unless flag?(:release) %} - @log.debug { "Getting a new message" } - {% end %} - stream_queue.consume_get(self) do |env| - deliver(env.message, env.segment_position, env.redelivered) - end - Fiber.yield if (i &+= 1) % 32768 == 0 + raise ClosedError.new if @closed + next if wait_for_queue_ready + next if wait_for_paused_queue + next if wait_for_flow + break + end + {% unless flag?(:release) %} + @log.debug { "Getting a new message" } + {% end %} + stream_queue.consume_get(self) do |env| + deliver(env.message, env.segment_position, env.redelivered) end - rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError - @log.debug { "deliver loop exiting: #{ex.inspect}" } + Fiber.yield if (i &+= 1) % 32768 == 0 end + rescue ex : ClosedError | Queue::ClosedError | AMQP::Channel::ClosedError | ::Channel::ClosedError + @log.debug { "deliver loop exiting: #{ex.inspect}" } + end - private def wait_for_queue_ready - if @offset > stream_queue.last_offset && @requeued.empty? - @log.debug { "Waiting for queue not to be empty" } - select - when stream_queue.new_messages.receive - @log.debug { "Queue is not empty" } - when @has_requeued.receive - @log.debug { "Got a requeued message" } - when @notify_closed.receive - end - return true + private def wait_for_queue_ready + if @offset > stream_queue.last_offset && @requeued.empty? + @log.debug { "Waiting for queue not to be empty" } + select + when stream_queue.new_messages.receive + @log.debug { "Queue is not empty" } + when @has_requeued.receive + @log.debug { "Got a requeued message" } + when @notify_closed.receive end + return true end + end - @has_requeued = ::Channel(Nil).new + @has_requeued = ::Channel(Nil).new - private def stream_queue : StreamQueue - @queue.as(StreamQueue) - end + private def stream_queue : StreamQueue + @queue.as(StreamQueue) + end - def reject(sp, requeue : Bool) - super - if requeue - @requeued.push(sp) - @has_requeued.try_send? nil if @requeued.size == 1 - end + def reject(sp, requeue : Bool) + super + if requeue + @requeued.push(sp) + @has_requeued.try_send? nil if @requeued.size == 1 end end end diff --git a/src/lavinmq/client/amqp_connection.cr b/src/lavinmq/client/amqp_connection.cr deleted file mode 100644 index 22de79674..000000000 --- a/src/lavinmq/client/amqp_connection.cr +++ /dev/null @@ -1,186 +0,0 @@ -require "log" -require "../version" - -module LavinMQ - class AMQPConnection - Log = ::Log.for "AMQPConnection" - - def self.start(socket, connection_info, vhosts, users) : Client? - remote_address = connection_info.src - socket.read_timeout = 15.seconds - if confirm_header(socket) - if start_ok = start(socket) - if user = authenticate(socket, remote_address, users, start_ok) - if tune_ok = tune(socket) - if vhost = open(socket, vhosts, user) - socket.read_timeout = heartbeat_timeout(tune_ok) - return Client.new(socket, connection_info, vhost, user, tune_ok, start_ok) - end - end - end - end - end - rescue ex : IO::TimeoutError | IO::Error | OpenSSL::SSL::Error | AMQP::Error::FrameDecode - Log.warn { "#{ex} when #{remote_address} tried to establish connection" } - nil - rescue ex - Log.error(exception: ex) { "Error while #{remote_address} tried to establish connection" } - nil - end - - private def self.heartbeat_timeout(tune_ok) - if tune_ok.heartbeat > 0 - (tune_ok.heartbeat / 2).seconds - end - end - - def self.confirm_header(socket) : Bool - proto = uninitialized UInt8[8] - count = socket.read(proto.to_slice) - if count.zero? # EOF, socket closed by peer - false - elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 - socket.write AMQP::PROTOCOL_START_0_9_1.to_slice - socket.flush - Log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } - false - else - true - end - end - - SERVER_PROPERTIES = AMQP::Table.new({ - "product": "LavinMQ", - "platform": "Crystal #{Crystal::VERSION}", - "version": LavinMQ::VERSION, - "capabilities": { - "publisher_confirms": true, - "exchange_exchange_bindings": true, - "basic.nack": true, - "consumer_cancel_notify": true, - "connection.blocked": true, - "consumer_priorities": true, - "authentication_failure_close": true, - "per_consumer_qos": true, - "direct_reply_to": true, - }, - }) - - def self.start(socket) - start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES) - socket.write_bytes start, ::IO::ByteFormat::NetworkEndian - socket.flush - start_ok = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::StartOk) } - if start_ok.bytesize > 4096 - Log.warn { "StartOk frame was #{start_ok.bytesize} bytes, max allowed is 4096 bytes" } - return - end - start_ok - end - - def self.credentials(start_ok) - case start_ok.mechanism - when "PLAIN" - resp = start_ok.response - if i = resp.index('\u0000', 1) - {resp[1...i], resp[(i + 1)..-1]} - else - raise "Invalid authentication response" - end - when "AMQPLAIN" - io = ::IO::Memory.new(start_ok.response) - tbl = AMQP::Table.from_io(io, ::IO::ByteFormat::NetworkEndian, io.bytesize.to_u32) - {tbl["LOGIN"].as(String), tbl["PASSWORD"].as(String)} - else raise "Unsupported authentication mechanism: #{start_ok.mechanism}" - end - end - - def self.authenticate(socket, remote_address, users, start_ok) - username, password = credentials(start_ok) - user = users[username]? - return user if user && user.password && user.password.not_nil!.verify(password) && - guest_only_loopback?(remote_address, user) - - if user.nil? - Log.warn { "User \"#{username}\" not found" } - else - Log.warn { "Authentication failure for user \"#{username}\"" } - end - props = start_ok.client_properties - if capabilities = props["capabilities"]?.try &.as?(AMQP::Table) - if capabilities["authentication_failure_close"]?.try &.as?(Bool) - socket.write_bytes AMQP::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED", - start_ok.class_id, - start_ok.method_id), IO::ByteFormat::NetworkEndian - socket.flush - end - end - nil - end - - def self.tune(socket) - frame_max = socket.is_a?(WebSocketIO) ? 4096_u32 : Config.instance.frame_max - socket.write_bytes AMQP::Frame::Connection::Tune.new( - channel_max: Config.instance.channel_max, - frame_max: frame_max, - heartbeat: Config.instance.heartbeat), IO::ByteFormat::NetworkEndian - socket.flush - tune_ok = AMQP::Frame.from_io(socket) do |frame| - case frame - when AMQP::Frame::Connection::TuneOk - if frame.frame_max < 4096 - Log.warn { "Suggested Frame max (#{frame.frame_max}) too low, closing connection" } - return - end - frame - else - Log.warn { "Expected TuneOk Frame got #{frame.inspect}" } - return - end - end - if tune_ok.frame_max < 4096 - Log.warn { "Suggested Frame max (#{tune_ok.frame_max}) too low, closing connection" } - return - end - tune_ok - end - - def self.open(socket, vhosts, user) - open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) } - vhost_name = open.vhost.empty? ? "/" : open.vhost - if vhost = vhosts[vhost_name]? - if user.permissions[vhost_name]? - if vhost.max_connections.try { |max| vhost.connections.size >= max } - Log.warn { "Max connections (#{vhost.max_connections}) reached for vhost #{vhost_name}" } - reply_text = "NOT_ALLOWED - access to vhost '#{vhost_name}' refused: connection limit (#{vhost.max_connections}) is reached" - socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, - open.class_id, open.method_id), IO::ByteFormat::NetworkEndian - socket.flush - return - end - socket.write_bytes AMQP::Frame::Connection::OpenOk.new, IO::ByteFormat::NetworkEndian - socket.flush - return vhost - else - Log.warn { "Access denied for user \"#{user.name}\" to vhost \"#{vhost_name}\"" } - reply_text = "NOT_ALLOWED - '#{user.name}' doesn't have access to '#{vhost.name}'" - socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, - open.class_id, open.method_id), IO::ByteFormat::NetworkEndian - socket.flush - end - else - Log.warn { "VHost \"#{vhost_name}\" not found" } - socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, "NOT_ALLOWED - vhost not found", - open.class_id, open.method_id), IO::ByteFormat::NetworkEndian - socket.flush - end - nil - end - - private def self.guest_only_loopback?(remote_address, user) : Bool - return true unless user.name == "guest" - return true unless Config.instance.guest_only_loopback? - remote_address.loopback? - end - end -end diff --git a/src/lavinmq/client/channel.cr b/src/lavinmq/client/channel.cr new file mode 100644 index 000000000..71efd5b06 --- /dev/null +++ b/src/lavinmq/client/channel.cr @@ -0,0 +1,10 @@ +require "../sortable_json" +require "./channel/consumer" + +module LavinMQ + abstract class Client + abstract class Channel + include SortableJSON + end + end +end diff --git a/src/lavinmq/client/channel/consumer.cr b/src/lavinmq/client/channel/consumer.cr new file mode 100644 index 000000000..0192a7058 --- /dev/null +++ b/src/lavinmq/client/channel/consumer.cr @@ -0,0 +1,11 @@ +require "../../logger" + +module LavinMQ + abstract class Client + abstract class Channel + abstract class Consumer + include SortableJSON + end + end + end +end diff --git a/src/lavinmq/client/client.cr b/src/lavinmq/client/client.cr new file mode 100644 index 000000000..052a116df --- /dev/null +++ b/src/lavinmq/client/client.cr @@ -0,0 +1,8 @@ +require "../sortable_json" +require "./channel" + +module LavinMQ + abstract class Client + include SortableJSON + end +end diff --git a/src/lavinmq/client/connection_factory.cr b/src/lavinmq/client/connection_factory.cr new file mode 100644 index 000000000..05c9199c5 --- /dev/null +++ b/src/lavinmq/client/connection_factory.cr @@ -0,0 +1,6 @@ +require "./client" + +module LavinMQ + abstract class ConnectionFactory + end +end diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 215e57fce..7fb74e20f 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -1,5 +1,5 @@ require "./durable_queue" -require "../client/channel/stream_consumer" +require "../amqp/stream_consumer" require "./stream_queue_message_store" module LavinMQ @@ -67,7 +67,7 @@ module LavinMQ false end - def consume_get(consumer : Client::Channel::StreamConsumer, & : Envelope -> Nil) : Bool + def consume_get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool get(consumer) do |env| yield env env.redelivered ? (@redeliver_count += 1) : (@deliver_count += 1) @@ -77,7 +77,7 @@ module LavinMQ # yield the next message in the ready queue # returns true if a message was deliviered, false otherwise # if we encouncer an unrecoverable ReadError, close queue - private def get(consumer : Client::Channel::StreamConsumer, & : Envelope -> Nil) : Bool + private def get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool raise ClosedError.new if @closed env = @msg_store_lock.synchronize { @msg_store.shift?(consumer) } || return false yield env # deliver the message @@ -157,7 +157,7 @@ module LavinMQ used_segments = Set(UInt32).new @consumers_lock.synchronize do @consumers.each do |consumer| - used_segments << consumer.as(Client::Channel::StreamConsumer).segment + used_segments << consumer.as(AMQP::StreamConsumer).segment end end @msg_store_lock.synchronize do diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index 2754cc643..8ce4f1a12 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -88,7 +88,7 @@ module LavinMQ {msg_offset, segment, pos} end - def shift?(consumer : Client::Channel::StreamConsumer) : Envelope? + def shift?(consumer : AMQP::StreamConsumer) : Envelope? raise ClosedError.new if @closed if env = shift_requeued(consumer.requeued) diff --git a/src/lavinmq/reporter.cr b/src/lavinmq/reporter.cr index 076026303..432ea69fa 100644 --- a/src/lavinmq/reporter.cr +++ b/src/lavinmq/reporter.cr @@ -28,13 +28,16 @@ module LavinMQ puts_size_capacity vh.@connections vh.connections.each do |c| puts " #{c.name}" - puts_size_capacity c.@channels, 4 + puts_size_capacity c.channels, 4 c.channels.each_value do |ch| puts " #{ch.id} global_prefetch=#{ch.global_prefetch_count} prefetch=#{ch.prefetch_count}" - puts_size_capacity ch.@unacked, 6 - puts_size_capacity ch.@consumers, 6 - puts_size_capacity ch.@visited, 6 - puts_size_capacity ch.@found_queues, 6 + puts_size_capacity ch.consumers, 8 + case ch + when AMQP::Channel + puts_size_capacity ch.@unacked, 8 + puts_size_capacity ch.@visited, 8 + puts_size_capacity ch.@found_queues, 8 + end end end end diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 63ee39a8e..ebffc78d3 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -13,6 +13,8 @@ require "./config" require "./connection_info" require "./proxy_protocol" require "./client/client" +require "./client/connection_factory" +require "./amqp/connection_factory" require "./stats" module LavinMQ @@ -34,6 +36,7 @@ module LavinMQ @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) @parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator) + @amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new apply_parameter spawn stats_loop, name: "Server#stats_loop" end @@ -239,7 +242,7 @@ module LavinMQ end def handle_connection(socket, connection_info) - client = AMQPConnection.start(socket, connection_info, @vhosts, @users) + client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users) ensure socket.close if client.nil? end