diff --git a/.ameba.yml b/.ameba.yml index 28e56c55e..92af60cc1 100644 --- a/.ameba.yml +++ b/.ameba.yml @@ -14,9 +14,9 @@ Naming/BlockParameterName: Lint/NotNil: Description: Identifies usage of `not_nil!` calls Excluded: - - src/lavinmq/client/amqp_connection.cr - - src/lavinmq/client/channel.cr - - src/lavinmq/client/client.cr + - src/lavinmq/amqp/connection_factory.cr + - src/lavinmq/amqp/channel.cr + - src/lavinmq/amqp/client.cr - src/lavinmq/federation/link.cr - src/lavinmq/http/controller/main.cr - src/lavinmq/http/controller/exchanges.cr diff --git a/src/lavinmq/amqp.cr b/src/lavinmq/amqp.cr index 515bbae20..1c7d300ac 100644 --- a/src/lavinmq/amqp.cr +++ b/src/lavinmq/amqp.cr @@ -1,5 +1,7 @@ require "amq-protocol" module LavinMQ - alias AMQP = AMQ::Protocol + module AMQP + include AMQ::Protocol + end end diff --git a/src/lavinmq/amqp/channel.cr b/src/lavinmq/amqp/channel.cr new file mode 100644 index 000000000..380c20fd6 --- /dev/null +++ b/src/lavinmq/amqp/channel.cr @@ -0,0 +1,801 @@ +require "./client" +require "./consumer" +require "./stream_consumer" +require "../error" +require "../queue" +require "../exchange" +require "../amqp" +require "../stats" +require "../sortable_json" + +module LavinMQ + module AMQP + class Channel < LavinMQ::Client::Channel + include Stats + include SortableJSON + + getter id, name + property? running = true + getter? flow = true + getter consumers = Array(Consumer).new + getter prefetch_count = 0_u16 + getter global_prefetch_count = 0_u16 + getter has_capacity = ::Channel(Nil).new + getter unacked = Deque(Unack).new + @confirm = false + @confirm_total = 0_u64 + @next_publish_mandatory = false + @next_publish_immediate = false + @next_publish_exchange_name : String? + @next_publish_routing_key : String? + @next_msg_size = 0_u64 + @next_msg_props : AMQP::Properties? + @delivery_tag = 0_u64 + @unack_lock = Mutex.new(:checked) + @next_msg_body_file : File? + @direct_reply_consumer : String? + @tx = false + @next_msg_body_tmp = IO::Memory.new + + rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"}) + + Log = ::Log.for "AMQP.channel" + + def initialize(@client : Client, @id : UInt16) + @metadata = ::Log::Metadata.new(nil, {client: @client.remote_address.to_s, channel: @id.to_i}) + @name = "#{@client.channel_name_prefix}[#{@id}]" + @log = Logger.new(Log, @metadata) + end + + record Unack, + tag : UInt64, + queue : Queue, + sp : SegmentPosition, + consumer : Consumer?, + delivered_at : Time::Span + + def details_tuple + { + number: @id, + name: @name, + vhost: @client.vhost.name, + user: @client.user.try(&.name), + consumer_count: @consumers.size, + prefetch_count: @prefetch_count, + global_prefetch_count: @global_prefetch_count, + confirm: @confirm, + transactional: false, + messages_unacknowledged: @unacked.size, + connection_details: @client.connection_details, + state: state, + message_stats: stats_details, + } + end + + def flow(active : Bool) + @flow = active + @consumers.each &.flow(active) + send AMQP::Frame::Channel::FlowOk.new(@id, active) + end + + def state + !@running ? "closed" : @flow ? "running" : "flow" + end + + def send(frame) + unless @running + @log.debug { "Channel is closed so is not sending #{frame.inspect}" } + return false + end + @client.send frame, true + end + + def confirm_select(frame) + if @tx + @client.send_precondition_failed(frame, "Channel already in transactional mode") + return + end + @confirm = true + unless frame.no_wait + send AMQP::Frame::Confirm::SelectOk.new(frame.channel) + end + end + + def start_publish(frame) + unless server_flow? + @client.send_precondition_failed(frame, "Server low on disk space") + return + end + raise LavinMQ::Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name + if ex = @client.vhost.exchanges[frame.exchange]? + if !ex.internal? + @next_publish_exchange_name = frame.exchange + @next_publish_routing_key = frame.routing_key + @next_publish_mandatory = frame.mandatory + @next_publish_immediate = frame.immediate + else + @client.send_access_refused(frame, "Exchange '#{frame.exchange}' in vhost '#{@client.vhost.name}' is internal") + end + else + @client.send_not_found(frame, "No exchange '#{frame.exchange}' in vhost '#{@client.vhost.name}'") + end + end + + private def direct_reply_to?(str) : Bool + {"amq.rabbitmq.reply-to", "amq.direct.reply-to"}.includes? str + end + + def next_msg_headers(frame) + raise LavinMQ::Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name.nil? + raise LavinMQ::Error::UnexpectedFrame.new(frame) if @next_msg_props + raise LavinMQ::Error::UnexpectedFrame.new(frame) if frame.class_id != 60 + valid_expiration?(frame) || return + if direct_reply_to?(frame.properties.reply_to) + if drc = @direct_reply_consumer + frame.properties.reply_to = "amq.direct.reply-to.#{drc}" + else + @client.send_precondition_failed(frame, "Direct reply consumer does not exist") + return + end + end + if frame.body_size > Config.instance.max_message_size + error = "message size #{frame.body_size} larger than max size #{Config.instance.max_message_size}" + @client.send_precondition_failed(frame, error) + @log.warn { "Message size exceeded, #{frame.body_size}/#{Config.instance.max_message_size}" } + return + end + @next_msg_size = frame.body_size + @next_msg_props = frame.properties + finish_publish(@next_msg_body_tmp) if frame.body_size.zero? + end + + @next_msg_body_file_pos = 0 + + def add_content(frame) + if @next_publish_exchange_name.nil? || @next_msg_props.nil? + frame.body.skip(frame.body_size) + raise LavinMQ::Error::UnexpectedFrame.new(frame) + end + if @tx # in transaction mode, copy all bodies to the tmp file serially + copied = IO.copy(frame.body, next_msg_body_file, frame.body_size) + if copied != frame.body_size + raise IO::Error.new("Could only copy #{copied} of #{frame.body_size} bytes") + end + if (@next_msg_body_file_pos += copied) == @next_msg_size + # as the body_io won't be read until tx_commit there's no need to rewind + # bodies can be appended sequentially to the tmp file + finish_publish(next_msg_body_file) + @next_msg_body_file_pos = 0 + end + elsif frame.body_size == @next_msg_size + copied = IO.copy(frame.body, @next_msg_body_tmp, frame.body_size) + if copied != frame.body_size + raise IO::Error.new("Could only copy #{copied} of #{frame.body_size} bytes") + end + @next_msg_body_tmp.rewind + begin + finish_publish(@next_msg_body_tmp) + ensure + @next_msg_body_tmp.clear + end + else + copied = IO.copy(frame.body, next_msg_body_file, frame.body_size) + if copied != frame.body_size + raise IO::Error.new("Could only copy #{copied} of #{frame.body_size} bytes") + end + if next_msg_body_file.pos == @next_msg_size + next_msg_body_file.rewind + begin + finish_publish(next_msg_body_file) + ensure + next_msg_body_file.rewind + end + end + end + end + + private def valid_expiration?(frame) : Bool + if exp = frame.properties.expiration + if i = exp.to_i? + if i < 0 + @client.send_precondition_failed(frame, "Negative expiration not allowed") + return false + end + else + @client.send_precondition_failed(frame, "Expiration not a number") + return false + end + end + true + end + + private def server_flow? + @client.vhost.flow? + end + + private def finish_publish(body_io) + @publish_count += 1 + @client.vhost.event_tick(EventType::ClientPublish) + props = @next_msg_props.not_nil! + props.timestamp = RoughTime.utc if props.timestamp.nil? && Config.instance.set_timestamp? + msg = Message.new(RoughTime.unix_ms, + @next_publish_exchange_name.not_nil!, + @next_publish_routing_key.not_nil!, + props, + @next_msg_size, + body_io) + direct_reply?(msg) || publish_and_return(msg) + ensure + @next_msg_size = 0_u64 + @next_msg_props = nil + @next_publish_exchange_name = @next_publish_routing_key = nil + @next_publish_mandatory = @next_publish_immediate = false + end + + @visited = Set(Exchange).new + @found_queues = Set(Queue).new + + record TxMessage, message : Message, mandatory : Bool, immediate : Bool + @tx_publishes = Array(TxMessage).new + + private def publish_and_return(msg) + validate_user_id(msg.properties.user_id) + if @tx + @tx_publishes.push TxMessage.new(msg, @next_publish_mandatory, @next_publish_immediate) + return + end + + confirm do + ok = @client.vhost.publish msg, @next_publish_immediate, @visited, @found_queues + basic_return(msg, @next_publish_mandatory, @next_publish_immediate) unless ok + rescue e : LavinMQ::Error::PreconditionFailed + msg.body_io.skip(msg.bodysize) + send AMQP::Frame::Channel::Close.new(@id, 406_u16, "PRECONDITION_FAILED - #{e.message}", 60_u16, 40_u16) + end + rescue Queue::RejectOverFlow + # nack but then do nothing + end + + private def validate_user_id(user_id) + current_user = @client.user + if user_id && user_id != current_user.name && !current_user.can_impersonate? + text = "Message's user_id property '#{user_id}' doesn't match actual user '#{current_user.name}'" + @log.error { text } + raise LavinMQ::Error::PreconditionFailed.new(text) + end + end + + private def confirm(&) + if @confirm + msgid = @confirm_total &+= 1 + begin + yield + confirm_ack(msgid) + rescue ex + confirm_nack(msgid) + raise ex + end + else + yield + end + end + + private def confirm_ack(msgid, multiple = false) + @client.vhost.event_tick(EventType::ClientPublishConfirm) + @confirm_count += 1 # Stats + send AMQP::Frame::Basic::Ack.new(@id, msgid, multiple) + end + + private def confirm_nack(msgid, multiple = false) + @client.vhost.event_tick(EventType::ClientPublishConfirm) + @confirm_count += 1 # Stats + send AMQP::Frame::Basic::Nack.new(@id, msgid, multiple, requeue: false) + end + + private def direct_reply?(msg) : Bool + return false unless msg.routing_key.starts_with? "amq.direct.reply-to." + consumer_tag = msg.routing_key[20..] + if ch = @client.vhost.direct_reply_consumers[consumer_tag]? + confirm do + deliver = AMQP::Frame::Basic::Deliver.new(ch.id, consumer_tag, + 1_u64, false, + msg.exchange_name, + msg.routing_key) + ch.deliver(deliver, msg) + end + true + else + false + end + end + + private def basic_return(msg : Message, mandatory : Bool, immediate : Bool) + @return_unroutable_count += 1 + if immediate + retrn = AMQP::Frame::Basic::Return.new(@id, 313_u16, "NO_CONSUMERS", msg.exchange_name, msg.routing_key) + deliver(retrn, msg) + msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind + elsif mandatory + retrn = AMQP::Frame::Basic::Return.new(@id, 312_u16, "NO_ROUTE", msg.exchange_name, msg.routing_key) + deliver(retrn, msg) + msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind + end + end + + def deliver(frame, msg, redelivered = false) : Nil + raise ClosedError.new("Channel is closed") unless @running + @client.deliver(frame, msg) + if redelivered + @redeliver_count += 1 + @client.vhost.event_tick(EventType::ClientRedeliver) + else + @deliver_count += 1 + @client.vhost.event_tick(EventType::ClientDeliver) + end + end + + def consume(frame) + if frame.consumer_tag.empty? + frame.consumer_tag = "amq.ctag-#{Random::Secure.urlsafe_base64(24)}" + end + if direct_reply_to?(frame.queue) + unless frame.no_ack + @client.send_precondition_failed(frame, "Direct replys must be consumed in no-ack mode") + return + end + @log.debug { "Saving direct reply consumer #{frame.consumer_tag}" } + @direct_reply_consumer = frame.consumer_tag + @client.vhost.direct_reply_consumers[frame.consumer_tag] = self + unless frame.no_wait + send AMQP::Frame::Basic::ConsumeOk.new(frame.channel, frame.consumer_tag) + end + elsif q = @client.vhost.queues[frame.queue]? + if @client.queue_exclusive_to_other_client?(q) + @client.send_resource_locked(frame, "Exclusive queue") + return + end + if q.has_exclusive_consumer? + @client.send_access_refused(frame, "Queue '#{frame.queue}' in vhost '#{@client.vhost.name}' in exclusive use") + return + end + c = if q.is_a? StreamQueue + AMQP::StreamConsumer.new(self, q, frame) + else + AMQP::Consumer.new(self, q, frame) + end + @consumers.push(c) + q.add_consumer(c) + unless frame.no_wait + send AMQP::Frame::Basic::ConsumeOk.new(frame.channel, frame.consumer_tag) + end + else + @client.send_not_found(frame, "Queue '#{frame.queue}' not declared") + end + Fiber.yield # Notify :add_consumer observers + end + + def basic_get(frame) + if q = @client.vhost.queues.fetch(frame.queue, nil) + if @client.queue_exclusive_to_other_client?(q) + @client.send_resource_locked(frame, "Exclusive queue") + elsif q.has_exclusive_consumer? + @client.send_access_refused(frame, "Queue '#{frame.queue}' in vhost '#{@client.vhost.name}' in exclusive use") + elsif q.is_a? StreamQueue + @client.send_not_implemented(frame, "Stream queues does not support basic_get") + else + @get_count += 1 + @client.vhost.event_tick(EventType::ClientGet) + ok = q.basic_get(frame.no_ack) do |env| + delivery_tag = next_delivery_tag(q, env.segment_position, frame.no_ack, nil) + unless frame.no_ack # track unacked messages + q.basic_get_unacked << UnackedMessage.new(self, delivery_tag, RoughTime.monotonic) + end + get_ok = AMQP::Frame::Basic::GetOk.new(frame.channel, delivery_tag, + env.redelivered, env.message.exchange_name, + env.message.routing_key, q.message_count) + deliver(get_ok, env.message, env.redelivered) + end + send AMQP::Frame::Basic::GetEmpty.new(frame.channel) unless ok + end + else + @client.send_not_found(frame, "No queue '#{frame.queue}' in vhost '#{@client.vhost.name}'") + end + end + + private def delete_unacked(delivery_tag) : Unack? + found = nil + @unack_lock.synchronize do + # @unacked is always sorted so can do a binary search + # optimization for acking first unacked + if @unacked[0]?.try(&.tag) == delivery_tag + # @log.debug { "Unacked found tag:#{delivery_tag} at front" } + found = @unacked.shift + elsif idx = @unacked.bsearch_index { |unack, _| unack.tag >= delivery_tag } + return nil unless @unacked[idx].tag == delivery_tag + # @log.debug { "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}" } + found = @unacked.delete_at(idx) + end + end + notify_has_capacity(1) if found + found + end + + private def delete_multiple_unacked(delivery_tag, & : Unack -> Nil) + count = 0 + @unack_lock.synchronize do + if delivery_tag.zero? + until @unacked.empty? + yield @unacked.shift + count += 1 + end + else + idx = @unacked.bsearch_index { |unack, _| unack.tag >= delivery_tag } + return nil unless idx + return nil unless @unacked[idx].tag == delivery_tag + # @log.debug { "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}" } + (idx + 1).times do + yield @unacked.shift + count += 1 + end + end + end + notify_has_capacity(count) + end + + def unacked_count + @unacked.size + end + + record TxAck, delivery_tag : UInt64, multiple : Bool, negative : Bool, requeue : Bool + @tx_acks = Array(TxAck).new + + def basic_ack(frame) + if @tx + @unack_lock.synchronize do + if frame.delivery_tag.zero? && frame.multiple # all msgs so far + @tx_acks.push(TxAck.new @unacked.last.tag, frame.multiple, false, false) + return + elsif @unacked.bsearch { |unack| unack.tag >= frame.delivery_tag }.try &.tag == frame.delivery_tag + check_double_ack!(frame.delivery_tag) + @tx_acks.push(TxAck.new frame.delivery_tag, frame.multiple, false, false) + return + end + end + @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) + return + end + + found = false + if frame.multiple + found = true if frame.delivery_tag.zero? + delete_multiple_unacked(frame.delivery_tag) do |unack| + found = true + do_ack(unack) + end + elsif unack = delete_unacked(frame.delivery_tag) + found = true + do_ack(unack) + end + unless found + @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) + end + rescue DoubleAck + @client.send_precondition_failed(frame, "Delivery tag already acked") + end + + private def do_ack(unack) + if c = unack.consumer + c.ack(unack.sp) + end + unack.queue.ack(unack.sp) + unack.queue.basic_get_unacked.reject! { |u| u.channel == self && u.delivery_tag == unack.tag } + @client.vhost.event_tick(EventType::ClientAck) + @ack_count += 1 + end + + def basic_reject(frame) + if @tx + @unack_lock.synchronize do + if @unacked.bsearch { |unack| unack.tag >= frame.delivery_tag }.try &.tag == frame.delivery_tag + check_double_ack!(frame.delivery_tag) + @tx_acks.push(TxAck.new frame.delivery_tag, false, true, frame.requeue) + return + end + end + @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) + return + end + + @log.debug { "Rejecting #{frame.inspect}" } + if unack = delete_unacked(frame.delivery_tag) + do_reject(frame.requeue, unack) + else + @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) + end + rescue DoubleAck + @client.send_precondition_failed(frame, "Delivery tag already acked") + end + + def basic_nack(frame) + if @tx + @unack_lock.synchronize do + if frame.delivery_tag.zero? && frame.multiple # all msgs so far + @tx_acks.push(TxAck.new @unacked.last.tag, true, true, frame.requeue) + return + elsif @unacked.bsearch { |unack| unack.tag >= frame.delivery_tag }.try &.tag == frame.delivery_tag + check_double_ack!(frame.delivery_tag) + @tx_acks.push(TxAck.new frame.delivery_tag, frame.multiple, true, frame.requeue) + return + end + end + @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) + return + end + + found = false + if frame.multiple + delete_multiple_unacked(frame.delivery_tag) do |unack| + found = true + do_reject(frame.requeue, unack) + end + elsif unack = delete_unacked(frame.delivery_tag) + found = true + do_reject(frame.requeue, unack) + end + unless found + @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) + end + rescue DoubleAck + @client.send_precondition_failed(frame, "Delivery tag already acked") + end + + private class DoubleAck < Error; end + + private def check_double_ack!(delivery_tag) + if @tx_acks.any? { |tx_ack| tx_ack.delivery_tag == delivery_tag } + raise DoubleAck.new + end + end + + private def unknown_tag(delivery_tag) + # Lower case u important for bunny on_error callback + "unknown delivery tag #{delivery_tag}" + end + + private def do_reject(requeue, unack) + if c = unack.consumer + c.reject(unack.sp, requeue) + end + unack.queue.reject(unack.sp, requeue) + unack.queue.basic_get_unacked.reject! { |u| u.channel == self && u.delivery_tag == unack.tag } + @reject_count += 1 + @client.vhost.event_tick(EventType::ClientReject) + end + + def basic_qos(frame) : Nil + @client.send_not_implemented(frame) if frame.prefetch_size != 0 + if frame.global + @global_prefetch_count = frame.prefetch_count + if frame.prefetch_count.zero? + while @has_capacity.try_send?(nil) + end + else + unacked_by_consumers = @unack_lock.synchronize { @unacked.count(&.consumer) } + notify_has_capacity(frame.prefetch_count.to_i - unacked_by_consumers) + end + else + @prefetch_count = frame.prefetch_count + @consumers.each(&.prefetch_count = frame.prefetch_count) + end + send AMQP::Frame::Basic::QosOk.new(frame.channel) + end + + def basic_recover(frame) : Nil + @unack_lock.synchronize do + if frame.requeue + @unacked.each do |unack| + next if delivery_tag_is_in_tx?(unack.tag) + if consumer = unack.consumer + consumer.reject(unack.sp, requeue: true) + end + unack.queue.reject(unack.sp, requeue: true) + end + @unacked.clear + notify_has_capacity + else # redeliver to the original recipient + @unacked.reject! do |unack| + next if delivery_tag_is_in_tx?(unack.tag) + if (consumer = unack.consumer) && !consumer.closed? + env = unack.queue.read(unack.sp) + consumer.deliver(env.message, env.segment_position, true, recover: true) + false + else + unack.queue.reject(unack.sp, requeue: true) + true + end + end + end + end + send AMQP::Frame::Basic::RecoverOk.new(frame.channel) + end + + private def delivery_tag_is_in_tx?(delivery_tag) : Bool + if @tx + @tx_acks.any? do |tx_ack| + (tx_ack.delivery_tag > delivery_tag && tx_ack.multiple) || tx_ack.delivery_tag == delivery_tag + end + else + false + end + end + + def close + @running = false + @consumers.each &.close + @consumers.clear + if drc = @direct_reply_consumer + @client.vhost.direct_reply_consumers.delete(drc) + end + @unack_lock.synchronize do + @unacked.each do |unack| + @log.debug { "Requeing unacked msg #{unack.sp}" } + unack.queue.reject(unack.sp, true) + unack.queue.basic_get_unacked.reject! { |u| u.channel == self && u.delivery_tag == unack.tag } + end + @unacked.clear + end + @has_capacity.close + @next_msg_body_file.try &.close + @client.vhost.event_tick(EventType::ChannelClosed) + @log.debug { "Closed" } + end + + protected def next_delivery_tag(queue : Queue, sp, no_ack, consumer) : UInt64 + @unack_lock.synchronize do + tag = @delivery_tag &+= 1 + @unacked.push Unack.new(tag, queue, sp, consumer, RoughTime.monotonic) unless no_ack + tag + end + end + + # Iterate over all unacked messages and see if any has been unacked longer than the queue's consumer timeout + def check_consumer_timeout + @unack_lock.synchronize do + queues = Set(Queue).new # only check first delivered message per queue + @unacked.each do |unack| + if queues.add? unack.queue + if timeout = unack.queue.consumer_timeout + unacked_ms = RoughTime.monotonic - unack.delivered_at + if unacked_ms > timeout.milliseconds + send AMQP::Frame::Channel::Close.new(@id, 406_u16, "PRECONDITION_FAILED - consumer timeout", 60_u16, 20_u16) + break + end + end + end + end + end + end + + def has_capacity? : Bool + return true if @global_prefetch_count.zero? + prefetch_limit = @global_prefetch_count + @unack_lock.synchronize do + count = 0 + @unacked.each do |unack| + next if unack.consumer.nil? # only count consumer unacked against limit + count += 1 + return false if count >= prefetch_limit + end + true + end + end + + private def notify_has_capacity(capacity = Int32::MAX) + return if @global_prefetch_count.zero? + return if capacity.negative? + capacity.times do + @has_capacity.try_send?(nil) || break + end + end + + def cancel_consumer(frame) + @log.debug { "Cancelling consumer '#{frame.consumer_tag}'" } + if idx = @consumers.index { |cons| cons.tag == frame.consumer_tag } + c = @consumers.delete_at idx + c.close + elsif @direct_reply_consumer == frame.consumer_tag + @direct_reply_consumer = nil + @client.vhost.direct_reply_consumers.delete(frame.consumer_tag) + end + unless frame.no_wait + send AMQP::Frame::Basic::CancelOk.new(frame.channel, frame.consumer_tag) + end + end + + private def next_msg_body_file + @next_msg_body_file ||= + begin + File.tempfile("channel.", nil, dir: @client.vhost.data_dir).tap do |f| + f.sync = true + f.read_buffering = false + f.delete + end + end + end + + def tx_select(frame) + if @confirm + @client.send_precondition_failed(frame, "Channel already in confirm mode") + return + end + @tx = true + send AMQP::Frame::Tx::SelectOk.new(frame.channel) + end + + def tx_commit(frame) + return @client.send_precondition_failed(frame, "Not in transaction mode") unless @tx + process_tx_acks + process_tx_publishes + @client.vhost.sync + send AMQP::Frame::Tx::CommitOk.new(frame.channel) + end + + private def process_tx_publishes + next_msg_body_file.rewind + @tx_publishes.each do |tx_msg| + tx_msg.message.timestamp = RoughTime.unix_ms + ok = @client.vhost.publish(tx_msg.message, tx_msg.immediate, @visited, @found_queues) + basic_return(tx_msg.message, tx_msg.mandatory, tx_msg.immediate) unless ok + # skip to next msg body in the next_msg_body_file + tx_msg.message.body_io.seek(tx_msg.message.bodysize, IO::Seek::Current) + end + @tx_publishes.clear + ensure + next_msg_body_file.rewind + end + + private def process_tx_acks + count = 0 + @unack_lock.synchronize do + @tx_acks.each do |tx_ack| + if idx = @unacked.bsearch_index { |u, _| u.tag >= tx_ack.delivery_tag } + raise "BUG: Delivery tag not found" unless @unacked[idx].tag == tx_ack.delivery_tag + @log.debug { "Unacked bsearch found tag:#{tx_ack.delivery_tag} at index:#{idx}" } + if tx_ack.multiple + (idx + 1).times do + unack = @unacked.shift + if tx_ack.negative + do_reject(tx_ack.requeue, unack) + else + do_ack(unack) + end + count += 1 + end + else + unack = @unacked.delete_at(idx) + if tx_ack.negative + do_reject(tx_ack.requeue, unack) + else + do_ack(unack) + end + count += 1 + end + end + end + @tx_acks.clear + end + notify_has_capacity(count) + end + + def tx_rollback(frame) + return @client.send_precondition_failed(frame, "Not in transaction mode") unless @tx + @tx_publishes.clear + @tx_acks.clear + next_msg_body_file.rewind + send AMQP::Frame::Tx::RollbackOk.new(frame.channel) + end + + class ClosedError < Error; end + end + end +end diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr new file mode 100644 index 000000000..08c948256 --- /dev/null +++ b/src/lavinmq/amqp/client.cr @@ -0,0 +1,839 @@ +require "openssl" +require "socket" +require "./channel" +require "../client" +require "../error" + +module LavinMQ + module AMQP + class Client < LavinMQ::Client + include Stats + include SortableJSON + + getter vhost, channels, log, name + getter user + getter max_frame_size : UInt32 + getter channel_max : UInt16 + getter heartbeat_timeout : UInt16 + getter auth_mechanism : String + getter client_properties : AMQP::Table + getter remote_address : Socket::IPAddress + + @connected_at = RoughTime.unix_ms + @channels = Hash(UInt16, Client::Channel).new + @exclusive_queues = Array(Queue).new + @heartbeat_interval_ms : Int64? + @local_address : Socket::IPAddress + @running = true + @last_recv_frame = RoughTime.monotonic + @last_sent_frame = RoughTime.monotonic + rate_stats({"send_oct", "recv_oct"}) + DEFAULT_EX = "amq.default" + Log = ::Log.for "AMQP.client" + + def initialize(@socket : IO, + @connection_info : ConnectionInfo, + @vhost : VHost, + @user : User, + tune_ok, + start_ok) + @remote_address = @connection_info.src + @local_address = @connection_info.dst + + @max_frame_size = tune_ok.frame_max + @channel_max = tune_ok.channel_max + @heartbeat_timeout = tune_ok.heartbeat + @heartbeat_interval_ms = tune_ok.heartbeat.zero? ? nil : ((tune_ok.heartbeat / 2) * 1000).to_i64 + @auth_mechanism = start_ok.mechanism + @name = "#{@remote_address} -> #{@local_address}" + @client_properties = start_ok.client_properties + connection_name = @client_properties["connection_name"]?.try(&.as?(String)) + @metadata = + if connection_name + ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s, name: connection_name}) + else + ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s}) + end + @log = Logger.new(Log, @metadata) + @vhost.add_connection(self) + @log.info { "Connection established for user=#{@user.name}" } + spawn read_loop, name: "Client#read_loop #{@remote_address}" + end + + # Returns client provided connection name if set, else server generated name + def client_name + @client_properties["connection_name"]?.try(&.as(String)) || @name + end + + def channel_name_prefix + @remote_address.to_s + end + + def details_tuple + { + channels: @channels.size, + connected_at: @connected_at, + type: "network", + channel_max: @channel_max, + frame_max: @max_frame_size, + timeout: @heartbeat_timeout, + client_properties: @client_properties, + vhost: @vhost.name, + user: @user.name, + protocol: "AMQP 0-9-1", + auth_mechanism: @auth_mechanism, + host: @local_address.address, + port: @local_address.port, + peer_host: @remote_address.address, + peer_port: @remote_address.port, + name: @name, + pid: @name, + ssl: @connection_info.ssl?, + tls_version: @connection_info.ssl_version, + cipher: @connection_info.ssl_cipher, + state: state, + }.merge(stats_details) + end + + private def read_loop + i = 0 + socket = @socket + loop do + AMQP::Frame.from_io(socket) do |frame| + {% unless flag?(:release) %} + @log.trace { "Received #{frame.inspect}" } + {% end %} + if (i += 1) == 8192 + i = 0 + Fiber.yield + end + frame_size_ok?(frame) || return + case frame + when AMQP::Frame::Connection::Close + @log.info { "Client disconnected: #{frame.reply_text}" } unless frame.reply_text.empty? + send AMQP::Frame::Connection::CloseOk.new + @running = false + next + when AMQP::Frame::Connection::CloseOk + @log.debug { "Confirmed disconnect" } + @running = false + return + end + if @running + process_frame(frame) + else + case frame + when AMQP::Frame::Body + @log.debug { "Skipping body, waiting for CloseOk" } + frame.body.skip(frame.body_size) + else + @log.debug { "Discarding #{frame.class.name}, waiting for CloseOk" } + end + end + rescue e : LavinMQ::Error::PreconditionFailed + send_precondition_failed(frame, e.message) + end + rescue IO::TimeoutError + send_heartbeat || break + rescue ex : AMQP::Error::NotImplemented + @log.error { ex.inspect } + send_not_implemented(ex) + rescue ex : AMQP::Error::FrameDecode + @log.error { ex.inspect_with_backtrace } + send_frame_error(ex.message) + rescue ex : IO::Error | OpenSSL::SSL::Error + @log.debug { "Lost connection, while reading (#{ex.inspect})" } unless closed? + break + rescue ex : Exception + @log.error { "Unexpected error, while reading: #{ex.inspect_with_backtrace}" } + send_internal_error(ex.message) + end + ensure + cleanup + close_socket + @log.info { "Connection disconnected for user=#{@user.name}" } + end + + private def frame_size_ok?(frame) : Bool + if frame.bytesize > @max_frame_size + send_frame_error("frame size #{frame.bytesize} exceeded max #{@max_frame_size} bytes") + return false + end + true + end + + private def send_heartbeat + now = RoughTime.monotonic + if @last_recv_frame + (@heartbeat_timeout + 5).seconds < now + @log.info { "Heartbeat timeout (#{@heartbeat_timeout}), last seen frame #{(now - @last_recv_frame).total_seconds} s ago, sent frame #{(now - @last_sent_frame).total_seconds} s ago" } + false + else + send AMQP::Frame::Heartbeat.new + end + end + + def send(frame : AMQP::Frame, channel_is_open : Bool? = nil) : Bool + return false if closed? + if channel_is_open.nil? + channel_is_open = frame.channel.zero? || @channels[frame.channel]?.try &.running? + end + unless channel_is_open + @log.debug { "Channel #{frame.channel} is closed so is not sending #{frame.inspect}" } + return false + end + {% unless flag?(:release) %} + @log.trace { "Send #{frame.inspect}" } + {% end %} + @write_lock.synchronize do + s = @socket + s.write_bytes frame, IO::ByteFormat::NetworkEndian + s.flush + end + @last_sent_frame = RoughTime.monotonic + @send_oct_count += 8_u64 + frame.bytesize + if frame.is_a?(AMQP::Frame::Connection::CloseOk) + return false + end + true + rescue ex : IO::Error | OpenSSL::SSL::Error + @log.debug { "Lost connection, while sending (#{ex.inspect})" } unless closed? + close_socket + false + rescue ex : IO::TimeoutError + @log.info { "Timeout while sending (#{ex.inspect})" } + close_socket + false + rescue ex + @log.error { "Unexpected error, while sending: #{ex.inspect_with_backtrace}" } + send_internal_error(ex.message) + false + end + + def connection_details + { + peer_host: @remote_address.address, + peer_port: @remote_address.port, + name: @name, + } + end + + @write_lock = Mutex.new(:checked) + + def deliver(frame, msg) + return false if closed? + @write_lock.synchronize do + socket = @socket + websocket = socket.is_a? WebSocketIO + {% unless flag?(:release) %} + @log.trace { "Send #{frame.inspect}" } + {% end %} + socket.write_bytes frame, ::IO::ByteFormat::NetworkEndian + socket.flush if websocket + @send_oct_count += 8_u64 + frame.bytesize + header = AMQP::Frame::Header.new(frame.channel, 60_u16, 0_u16, msg.bodysize, msg.properties) + {% unless flag?(:release) %} + @log.trace { "Send #{header.inspect}" } + {% end %} + socket.write_bytes header, ::IO::ByteFormat::NetworkEndian + socket.flush if websocket + @send_oct_count += 8_u64 + header.bytesize + pos = 0 + while pos < msg.bodysize + length = Math.min(msg.bodysize - pos, @max_frame_size - 8).to_u32 + {% unless flag?(:release) %} + @log.trace { "Send BodyFrame (pos #{pos}, length #{length})" } + {% end %} + body = case msg + in BytesMessage + AMQP::Frame::BytesBody.new(frame.channel, length, msg.body[pos, length]) + in Message + AMQP::Frame::Body.new(frame.channel, length, msg.body_io) + end + socket.write_bytes body, ::IO::ByteFormat::NetworkEndian + socket.flush if websocket + @send_oct_count += 8_u64 + body.bytesize + pos += length + end + socket.flush unless websocket # Websockets need to send one frame per WS frame + @last_sent_frame = RoughTime.monotonic + end + true + rescue ex : IO::Error | OpenSSL::SSL::Error + @log.debug { "Lost connection, while sending (#{ex.inspect})" } + close_socket + Fiber.yield + false + rescue ex : AMQ::Protocol::Error::FrameEncode + @log.warn { "Error encoding frame (#{ex.inspect})" } + close_socket + false + rescue ex : IO::TimeoutError + @log.info { "Timeout while sending (#{ex.inspect})" } + close_socket + false + rescue ex + @log.error { "Delivery exception: #{ex.inspect_with_backtrace}" } + raise ex + end + + def state + !@running ? "closed" : (@vhost.flow? ? "running" : "flow") + end + + private def with_channel(frame, &) + if ch = @channels[frame.channel]? + if ch.running? + yield ch + else + case frame + when AMQP::Frame::Basic::Publish, AMQP::Frame::Header + @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + when AMQP::Frame::Body + @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + frame.body.skip(frame.body_size) + else + @log.trace { "Discarding #{frame.inspect}, waiting for Close(Ok)" } + end + end + else + case frame + when AMQP::Frame::Basic::Publish, AMQP::Frame::Header + @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + when AMQP::Frame::Body + @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } + frame.body.skip(frame.body_size) + else + @log.error { "Channel #{frame.channel} not open while processing #{frame.class.name}" } + close_connection(frame, 504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open") + end + end + end + + private def open_channel(frame) + if @channels.has_key? frame.channel + close_connection(frame, 504_u16, "CHANNEL_ERROR - second 'channel.open' seen") + else + @channels[frame.channel] = AMQP::Channel.new(self, frame.channel) + @vhost.event_tick(EventType::ChannelCreated) + send AMQP::Frame::Channel::OpenOk.new(frame.channel) + end + end + + # ameba:disable Metrics/CyclomaticComplexity + private def process_frame(frame) : Nil + @last_recv_frame = RoughTime.monotonic + @recv_oct_count += 8_u64 + frame.bytesize + case frame + when AMQP::Frame::Channel::Open + open_channel(frame) + when AMQP::Frame::Channel::Close + @channels.delete(frame.channel).try &.close + send AMQP::Frame::Channel::CloseOk.new(frame.channel), true + when AMQP::Frame::Channel::CloseOk + @channels.delete(frame.channel).try &.close + when AMQP::Frame::Channel::Flow + with_channel frame, &.flow(frame.active) + when AMQP::Frame::Channel::FlowOk + # noop + when AMQP::Frame::Confirm::Select + with_channel frame, &.confirm_select(frame) + when AMQP::Frame::Exchange::Declare + declare_exchange(frame) + when AMQP::Frame::Exchange::Delete + delete_exchange(frame) + when AMQP::Frame::Exchange::Bind + bind_exchange(frame) + when AMQP::Frame::Exchange::Unbind + unbind_exchange(frame) + when AMQP::Frame::Queue::Declare + declare_queue(frame) + when AMQP::Frame::Queue::Bind + bind_queue(frame) + when AMQP::Frame::Queue::Unbind + unbind_queue(frame) + when AMQP::Frame::Queue::Delete + delete_queue(frame) + when AMQP::Frame::Queue::Purge + purge_queue(frame) + when AMQP::Frame::Basic::Publish + start_publish(frame) + when AMQP::Frame::Header + with_channel frame, &.next_msg_headers(frame) + when AMQP::Frame::Body + with_channel frame, &.add_content(frame) + when AMQP::Frame::Basic::Consume + consume(frame) + when AMQP::Frame::Basic::Get + basic_get(frame) + when AMQP::Frame::Basic::Ack + with_channel frame, &.basic_ack(frame) + when AMQP::Frame::Basic::Reject + with_channel frame, &.basic_reject(frame) + when AMQP::Frame::Basic::Nack + with_channel frame, &.basic_nack(frame) + when AMQP::Frame::Basic::Cancel + with_channel frame, &.cancel_consumer(frame) + when AMQP::Frame::Basic::Qos + with_channel frame, &.basic_qos(frame) + when AMQP::Frame::Basic::Recover + with_channel frame, &.basic_recover(frame) + when AMQP::Frame::Tx::Select + with_channel frame, &.tx_select(frame) + when AMQP::Frame::Tx::Commit + with_channel frame, &.tx_commit(frame) + when AMQP::Frame::Tx::Rollback + with_channel frame, &.tx_rollback(frame) + when AMQP::Frame::Heartbeat + nil + else + send_not_implemented(frame) + end + if heartbeat_interval_ms = @heartbeat_interval_ms + if @last_sent_frame + heartbeat_interval_ms.milliseconds < RoughTime.monotonic + send AMQP::Frame::Heartbeat.new + end + end + rescue ex : LavinMQ::Error::UnexpectedFrame + @log.error { ex.inspect } + close_channel(ex.frame, 505_u16, "UNEXPECTED_FRAME - #{ex.frame.class.name}") + end + + private def cleanup + @running = false + @channels.each_value &.close + @channels.clear + @exclusive_queues.each(&.close) + @exclusive_queues.clear + @vhost.rm_connection(self) + end + + private def close_socket + @running = false + @socket.close + @log.debug { "Socket closed" } + rescue ex + @log.debug { "#{ex.inspect} when closing socket" } + end + + def close(reason = nil) + reason ||= "Connection closed" + @log.info { "Closing, #{reason}" } + send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16) + @running = false + end + + def force_close + close_socket + end + + def closed? + !@running + end + + def close_channel(frame : AMQ::Protocol::Frame, code, text) + return close_connection(frame, code, text) if frame.channel.zero? + case frame + when AMQ::Protocol::Frame::Method + send AMQP::Frame::Channel::Close.new(frame.channel, code, text, frame.class_id, frame.method_id) + else + send AMQP::Frame::Channel::Close.new(frame.channel, code, text, 0, 0) + end + @channels.delete(frame.channel).try &.close + end + + def close_connection(frame : AMQ::Protocol::Frame?, code, text) + @log.info { "Closing, #{text}" } + case frame + when AMQ::Protocol::Frame::Method + send AMQP::Frame::Connection::Close.new(code, text, frame.class_id, frame.method_id) + else + send AMQP::Frame::Connection::Close.new(code, text, 0_u16, 0_u16) + end + @log.info { "Connection=#{@name} disconnected" } + ensure + @running = false + end + + def send_access_refused(frame, text) + @log.warn { "Access refused channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 403_u16, "ACCESS_REFUSED - #{text}") + end + + def send_not_found(frame, text = "") + @log.warn { "Not found channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 404_u16, "NOT_FOUND - #{text}") + end + + def send_resource_locked(frame, text) + @log.warn { "Resource locked channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 405_u16, "RESOURCE_LOCKED - #{text}") + end + + def send_precondition_failed(frame, text) + @log.warn { "Precondition failed channel=#{frame.channel} reason=\"#{text}\"" } + close_channel(frame, 406_u16, "PRECONDITION_FAILED - #{text}") + end + + def send_not_implemented(frame, text = nil) + @log.error { "#{frame.inspect}, not implemented reason=\"#{text}\"" } + close_channel(frame, 540_u16, "NOT_IMPLEMENTED - #{text}") + end + + def send_not_implemented(ex : AMQ::Protocol::Error::NotImplemented) + text = "NOT_IMPLEMENTED" + if ex.channel.zero? + send AMQP::Frame::Connection::Close.new(540, text, ex.class_id, ex.method_id) + @running = false + else + send AMQP::Frame::Channel::Close.new(ex.channel, 540, text, ex.class_id, ex.method_id) + @channels.delete(ex.channel).try &.close + end + end + + def send_internal_error(message) + close_connection(nil, 541_u16, "INTERNAL_ERROR - Unexpected error, please report") + end + + def send_frame_error(message = nil) + close_connection(nil, 501_u16, "FRAME_ERROR - #{message}") + end + + private def declare_exchange(frame) + if !valid_entity_name(frame.exchange_name) + send_precondition_failed(frame, "Exchange name isn't valid") + elsif frame.exchange_name.empty? + send_access_refused(frame, "Not allowed to declare the default exchange") + elsif e = @vhost.exchanges.fetch(frame.exchange_name, nil) + redeclare_exchange(e, frame) + elsif frame.passive + send_not_found(frame, "Exchange '#{frame.exchange_name}' doesn't exists") + elsif frame.exchange_name.starts_with? "amq." + send_access_refused(frame, "Not allowed to use the amq. prefix") + else + ae = frame.arguments["x-alternate-exchange"]?.try &.as?(String) + ae_ok = ae.nil? || (@user.can_write?(@vhost.name, ae) && @user.can_read?(@vhost.name, frame.exchange_name)) + unless @user.can_config?(@vhost.name, frame.exchange_name) && ae_ok + send_access_refused(frame, "User doesn't have permissions to declare exchange '#{frame.exchange_name}'") + return + end + begin + @vhost.apply(frame) + rescue e : LavinMQ::Error::ExchangeTypeError + send_precondition_failed(frame, e.message) + end + send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) unless frame.no_wait + end + end + + private def redeclare_exchange(e, frame) + if frame.passive || e.match?(frame) + unless frame.no_wait + send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) + end + else + send_precondition_failed(frame, "Existing exchange '#{frame.exchange_name}' declared with other arguments") + end + end + + private def delete_exchange(frame) + if !valid_entity_name(frame.exchange_name) + send_precondition_failed(frame, "Exchange name isn't valid") + elsif frame.exchange_name.empty? + send_access_refused(frame, "Not allowed to delete the default exchange") + elsif frame.exchange_name.starts_with? "amq." + send_access_refused(frame, "Not allowed to use the amq. prefix") + elsif !@vhost.exchanges.has_key? frame.exchange_name + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait + elsif !@user.can_config?(@vhost.name, frame.exchange_name) + send_access_refused(frame, "User doesn't have permissions to delete exchange '#{frame.exchange_name}'") + elsif frame.if_unused && @vhost.exchanges[frame.exchange_name].in_use? + send_precondition_failed(frame, "Exchange '#{frame.exchange_name}' in use") + else + @vhost.apply(frame) + send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait + end + end + + # ameba:disable Metrics/CyclomaticComplexity + private def delete_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + end + if !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + return + end + q = @vhost.queues.fetch(frame.queue_name, nil) + if q.nil? + send AMQP::Frame::Queue::DeleteOk.new(frame.channel, 0_u32) unless frame.no_wait + elsif queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Queue '#{q.name}' is exclusive") + elsif frame.if_unused && !q.consumer_count.zero? + send_precondition_failed(frame, "Queue '#{q.name}' in use") + elsif frame.if_empty && !q.message_count.zero? + send_precondition_failed(frame, "Queue '#{q.name}' is not empty") + elsif !@user.can_config?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have permissions to delete queue '#{q.name}'") + else + size = q.message_count + @vhost.apply(frame) + @exclusive_queues.delete(q) if q.exclusive? + send AMQP::Frame::Queue::DeleteOk.new(frame.channel, size) unless frame.no_wait + end + end + + private def valid_entity_name(name) : Bool + return true if name.empty? + name.matches?(/\A[ -~]*\z/) + end + + def queue_exclusive_to_other_client?(q) + q.exclusive? && !@exclusive_queues.includes?(q) + end + + private def declare_queue(frame) + if !frame.queue_name.empty? && !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + elsif q = @vhost.queues.fetch(frame.queue_name, nil) + redeclare_queue(frame, q) + elsif {"amq.rabbitmq.reply-to", "amq.direct.reply-to"}.includes? frame.queue_name + unless frame.no_wait + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) + end + elsif frame.queue_name.starts_with?("amq.direct.reply-to.") + consumer_tag = frame.queue_name[20..] + if @vhost.direct_reply_consumers.has_key? consumer_tag + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 1_u32) + else + send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") + end + elsif frame.passive + send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") + elsif frame.queue_name.starts_with? "amq." + send_access_refused(frame, "Not allowed to use the amq. prefix") + elsif @vhost.max_queues.try { |max| @vhost.queues.size >= max } + send_access_refused(frame, "queue limit in vhost '#{@vhost.name}' (#{@vhost.max_queues}) is reached") + else + declare_new_queue(frame) + end + end + + private def redeclare_queue(frame, q) + if queue_exclusive_to_other_client?(q) || invalid_exclusive_redclare?(frame, q) + send_resource_locked(frame, "Exclusive queue") + elsif frame.passive || q.match?(frame) + q.redeclare + unless frame.no_wait + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, q.name, + q.message_count, q.consumer_count) + end + @last_queue_name = frame.queue_name + elsif frame.exclusive && !q.exclusive? + send_resource_locked(frame, "Not an exclusive queue") + else + send_precondition_failed(frame, "Existing queue '#{q.name}' declared with other arguments") + end + end + + private def invalid_exclusive_redclare?(frame, q) + q.exclusive? && !frame.passive && !frame.exclusive + end + + @last_queue_name : String? + + private def declare_new_queue(frame) + unless @vhost.flow? + send_precondition_failed(frame, "Server low on disk space, can not create queue") + end + if frame.queue_name.empty? + frame.queue_name = Queue.generate_name + end + dlx = frame.arguments["x-dead-letter-exchange"]?.try &.as?(String) + dlx_ok = dlx.nil? || (@user.can_write?(@vhost.name, dlx) && @user.can_read?(@vhost.name, name)) + unless @user.can_config?(@vhost.name, frame.queue_name) && dlx_ok + send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue_name}'") + return + end + @vhost.apply(frame) + @last_queue_name = frame.queue_name + if frame.exclusive + @exclusive_queues << @vhost.queues[frame.queue_name] + end + unless frame.no_wait + send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) + end + end + + private def bind_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + # according to spec if both queue name and routing key is empty, + # then substitute them with the name of the last declared queue + if frame.routing_key.empty? + frame.routing_key = @last_queue_name.not_nil! + end + end + return unless valid_q_bind_unbind?(frame) + + q = @vhost.queues.fetch(frame.queue_name, nil) + if q.nil? + send_not_found frame, "Queue '#{frame.queue_name}' not found" + elsif !@vhost.exchanges.has_key? frame.exchange_name + send_not_found frame, "Exchange '#{frame.exchange_name}' not found" + elsif !@user.can_read?(@vhost.name, frame.exchange_name) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") + elsif !@user.can_write?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") + elsif queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Exclusive queue") + else + @vhost.apply(frame) + send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait + end + end + + private def unbind_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + end + return unless valid_q_bind_unbind?(frame) + + q = @vhost.queues.fetch(frame.queue_name, nil) + if q.nil? + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + elsif !@vhost.exchanges.has_key? frame.exchange_name + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + elsif !@user.can_read?(@vhost.name, frame.exchange_name) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") + elsif !@user.can_write?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") + elsif queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Exclusive queue") + else + @vhost.apply(frame) + send AMQP::Frame::Queue::UnbindOk.new(frame.channel) + end + end + + private def valid_q_bind_unbind?(frame) : Bool + if !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + return false + elsif !valid_entity_name(frame.exchange_name) + send_precondition_failed(frame, "Exchange name isn't valid") + return false + elsif frame.exchange_name.empty? || frame.exchange_name == DEFAULT_EX + target = frame.is_a?(AMQP::Frame::Queue::Bind) ? "bind to" : "unbind from" + send_access_refused(frame, "Not allowed to #{target} the default exchange") + return false + end + true + end + + private def bind_exchange(frame) + source = @vhost.exchanges.fetch(frame.source, nil) + destination = @vhost.exchanges.fetch(frame.destination, nil) + if destination.nil? + send_not_found frame, "Exchange '#{frame.destination}' doesn't exists" + elsif source.nil? + send_not_found frame, "Exchange '#{frame.source}' doesn't exists" + elsif !@user.can_read?(@vhost.name, frame.source) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") + elsif !@user.can_write?(@vhost.name, frame.destination) + send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") + elsif frame.source.empty? || frame.destination.empty? + send_access_refused(frame, "Not allowed to bind to the default exchange") + else + @vhost.apply(frame) + send AMQP::Frame::Exchange::BindOk.new(frame.channel) unless frame.no_wait + end + end + + private def unbind_exchange(frame) + source = @vhost.exchanges.fetch(frame.source, nil) + destination = @vhost.exchanges.fetch(frame.destination, nil) + if destination.nil? + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) + elsif source.nil? + # should return not_found according to spec but we make it idempotent + send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) + elsif !@user.can_read?(@vhost.name, frame.source) + send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") + elsif !@user.can_write?(@vhost.name, frame.destination) + send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") + elsif frame.source.empty? || frame.destination.empty? || frame.source == DEFAULT_EX || frame.destination == DEFAULT_EX + send_access_refused(frame, "Not allowed to unbind from the default exchange") + else + @vhost.apply(frame) + send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) unless frame.no_wait + end + end + + private def purge_queue(frame) + if frame.queue_name.empty? && @last_queue_name + frame.queue_name = @last_queue_name.not_nil! + end + unless @user.can_read?(@vhost.name, frame.queue_name) + send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") + return + end + if !valid_entity_name(frame.queue_name) + send_precondition_failed(frame, "Queue name isn't valid") + elsif q = @vhost.queues.fetch(frame.queue_name, nil) + if queue_exclusive_to_other_client?(q) + send_resource_locked(frame, "Queue '#{q.name}' is exclusive") + else + messages_purged = q.purge + send AMQP::Frame::Queue::PurgeOk.new(frame.channel, messages_purged) unless frame.no_wait + end + else + send_not_found(frame, "Queue '#{frame.queue_name}' not found") + end + end + + private def start_publish(frame) + unless @user.can_write?(@vhost.name, frame.exchange) + send_access_refused(frame, "User not allowed to publish to exchange '#{frame.exchange}'") + return + end + with_channel frame, &.start_publish(frame) + end + + private def consume(frame) + if frame.queue.empty? && @last_queue_name + frame.queue = @last_queue_name.not_nil! + end + if !valid_entity_name(frame.queue) + send_precondition_failed(frame, "Queue name isn't valid") + return + end + unless @user.can_read?(@vhost.name, frame.queue) + send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") + return + end + with_channel frame, &.consume(frame) + end + + private def basic_get(frame) + if frame.queue.empty? && @last_queue_name + frame.queue = @last_queue_name.not_nil! + end + if !valid_entity_name(frame.queue) + send_precondition_failed(frame, "Queue name isn't valid") + return + end + unless @user.can_read?(@vhost.name, frame.queue) + send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") + return + end + # yield so that msg expiration, consumer delivery etc gets priority + Fiber.yield + with_channel frame, &.basic_get(frame) + end + end + end +end diff --git a/src/lavinmq/amqp/connection_factory.cr b/src/lavinmq/amqp/connection_factory.cr new file mode 100644 index 000000000..a81baa35f --- /dev/null +++ b/src/lavinmq/amqp/connection_factory.cr @@ -0,0 +1,190 @@ +require "log" +require "../version" +require "./client" +require "../client/connection_factory" + +module LavinMQ + module AMQP + class ConnectionFactory < LavinMQ::ConnectionFactory + Log = ::Log.for "AMQP.ConnectionFactory" + + def start(socket, connection_info, vhosts, users) : Client? + remote_address = connection_info.src + socket.read_timeout = 15.seconds + if confirm_header(socket) + if start_ok = start(socket) + if user = authenticate(socket, remote_address, users, start_ok) + if tune_ok = tune(socket) + if vhost = open(socket, vhosts, user) + socket.read_timeout = heartbeat_timeout(tune_ok) + return LavinMQ::AMQP::Client.new(socket, connection_info, vhost, user, tune_ok, start_ok) + end + end + end + end + end + rescue ex : IO::TimeoutError | IO::Error | OpenSSL::SSL::Error | AMQP::Error::FrameDecode + Log.warn { "#{ex} when #{remote_address} tried to establish connection" } + nil + rescue ex + Log.error(exception: ex) { "Error while #{remote_address} tried to establish connection" } + nil + end + + private def heartbeat_timeout(tune_ok) + if tune_ok.heartbeat > 0 + (tune_ok.heartbeat / 2).seconds + end + end + + def confirm_header(socket) : Bool + proto = uninitialized UInt8[8] + count = socket.read(proto.to_slice) + if count.zero? # EOF, socket closed by peer + false + elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 + socket.write AMQP::PROTOCOL_START_0_9_1.to_slice + socket.flush + Log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } + false + else + true + end + end + + SERVER_PROPERTIES = AMQP::Table.new({ + "product": "LavinMQ", + "platform": "Crystal #{Crystal::VERSION}", + "version": LavinMQ::VERSION, + "capabilities": { + "publisher_confirms": true, + "exchange_exchange_bindings": true, + "basic.nack": true, + "consumer_cancel_notify": true, + "connection.blocked": true, + "consumer_priorities": true, + "authentication_failure_close": true, + "per_consumer_qos": true, + "direct_reply_to": true, + }, + }) + + def start(socket) + start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES) + socket.write_bytes start, ::IO::ByteFormat::NetworkEndian + socket.flush + start_ok = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::StartOk) } + if start_ok.bytesize > 4096 + Log.warn { "StartOk frame was #{start_ok.bytesize} bytes, max allowed is 4096 bytes" } + return + end + start_ok + end + + def credentials(start_ok) + case start_ok.mechanism + when "PLAIN" + resp = start_ok.response + if i = resp.index('\u0000', 1) + {resp[1...i], resp[(i + 1)..-1]} + else + raise "Invalid authentication response" + end + when "AMQPLAIN" + io = ::IO::Memory.new(start_ok.response) + tbl = AMQP::Table.from_io(io, ::IO::ByteFormat::NetworkEndian, io.bytesize.to_u32) + {tbl["LOGIN"].as(String), tbl["PASSWORD"].as(String)} + else raise "Unsupported authentication mechanism: #{start_ok.mechanism}" + end + end + + def authenticate(socket, remote_address, users, start_ok) + username, password = credentials(start_ok) + user = users[username]? + return user if user && user.password && user.password.not_nil!.verify(password) && + guest_only_loopback?(remote_address, user) + + if user.nil? + Log.warn { "User \"#{username}\" not found" } + else + Log.warn { "Authentication failure for user \"#{username}\"" } + end + props = start_ok.client_properties + if capabilities = props["capabilities"]?.try &.as?(AMQP::Table) + if capabilities["authentication_failure_close"]?.try &.as?(Bool) + socket.write_bytes AMQP::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED", + start_ok.class_id, + start_ok.method_id), IO::ByteFormat::NetworkEndian + socket.flush + end + end + nil + end + + def tune(socket) + frame_max = socket.is_a?(WebSocketIO) ? 4096_u32 : Config.instance.frame_max + socket.write_bytes AMQP::Frame::Connection::Tune.new( + channel_max: Config.instance.channel_max, + frame_max: frame_max, + heartbeat: Config.instance.heartbeat), IO::ByteFormat::NetworkEndian + socket.flush + tune_ok = AMQP::Frame.from_io(socket) do |frame| + case frame + when AMQP::Frame::Connection::TuneOk + if frame.frame_max < 4096 + Log.warn { "Suggested Frame max (#{frame.frame_max}) too low, closing connection" } + return + end + frame + else + Log.warn { "Expected TuneOk Frame got #{frame.inspect}" } + return + end + end + if tune_ok.frame_max < 4096 + Log.warn { "Suggested Frame max (#{tune_ok.frame_max}) too low, closing connection" } + return + end + tune_ok + end + + def open(socket, vhosts, user) + open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) } + vhost_name = open.vhost.empty? ? "/" : open.vhost + if vhost = vhosts[vhost_name]? + if user.permissions[vhost_name]? + if vhost.max_connections.try { |max| vhost.connections.size >= max } + Log.warn { "Max connections (#{vhost.max_connections}) reached for vhost #{vhost_name}" } + reply_text = "NOT_ALLOWED - access to vhost '#{vhost_name}' refused: connection limit (#{vhost.max_connections}) is reached" + socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, + open.class_id, open.method_id), IO::ByteFormat::NetworkEndian + socket.flush + return + end + socket.write_bytes AMQP::Frame::Connection::OpenOk.new, IO::ByteFormat::NetworkEndian + socket.flush + return vhost + else + Log.warn { "Access denied for user \"#{user.name}\" to vhost \"#{vhost_name}\"" } + reply_text = "NOT_ALLOWED - '#{user.name}' doesn't have access to '#{vhost.name}'" + socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, + open.class_id, open.method_id), IO::ByteFormat::NetworkEndian + socket.flush + end + else + Log.warn { "VHost \"#{vhost_name}\" not found" } + socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, "NOT_ALLOWED - vhost not found", + open.class_id, open.method_id), IO::ByteFormat::NetworkEndian + socket.flush + end + nil + end + + private def guest_only_loopback?(remote_address, user) : Bool + return true unless user.name == "guest" + return true unless Config.instance.guest_only_loopback? + remote_address.loopback? + end + end + end +end diff --git a/src/lavinmq/amqp/consumer.cr b/src/lavinmq/amqp/consumer.cr new file mode 100644 index 000000000..d581afa1c --- /dev/null +++ b/src/lavinmq/amqp/consumer.cr @@ -0,0 +1,262 @@ +require "log" +require "../client/channel/consumer" + +module LavinMQ + module AMQP + class Consumer < LavinMQ::Client::Channel::Consumer + include SortableJSON + Log = ::Log.for "AMQP.consumer" + getter tag : String + getter priority : Int32 + getter? exclusive : Bool + getter? no_ack : Bool + getter channel, queue + getter prefetch_count = 0u16 + getter unacked = 0_u32 + getter? closed = false + @flow : Bool + @metadata : ::Log::Metadata + + def initialize(@channel : AMQP::Channel, @queue : Queue, frame : AMQP::Frame::Basic::Consume) + @tag = frame.consumer_tag + @no_ack = frame.no_ack + @exclusive = frame.exclusive + @priority = consumer_priority(frame) # Must be before ConsumeOk, can close channel + @prefetch_count = @channel.prefetch_count + @flow = @channel.flow? + @metadata = @channel.@metadata.extend({consumer: @tag}) + @log = Logger.new(Log, @metadata) + spawn deliver_loop, name: "Consumer deliver loop", same_thread: true + end + + def close + @closed = true + @queue.rm_consumer(self) + @notify_closed.close + @has_capacity.close + @flow_change.close + end + + @notify_closed = ::Channel(Nil).new + @flow_change = ::Channel(Bool).new + + def flow(active : Bool) + @flow = active + @flow_change.try_send? active + end + + def prefetch_count=(prefetch_count : UInt16) + @prefetch_count = prefetch_count + notify_has_capacity(@prefetch_count > @unacked) + end + + private def deliver_loop + wait_for_single_active_consumer + queue = @queue + i = 0 + loop do + wait_for_capacity + loop do + raise ClosedError.new if @closed + next if wait_for_global_capacity + next if wait_for_priority_consumers + next if wait_for_queue_ready + next if wait_for_paused_queue + next if wait_for_flow + break + end + {% unless flag?(:release) %} + @log.debug { "Getting a new message" } + {% end %} + queue.consume_get(self) do |env| + deliver(env.message, env.segment_position, env.redelivered) + end + Fiber.yield if (i &+= 1) % 32768 == 0 + end + rescue ex : ClosedError | Queue::ClosedError | AMQP::Channel::ClosedError | ::Channel::ClosedError + @log.debug { "deliver loop exiting: #{ex.inspect}" } + end + + private def wait_for_global_capacity + ch = @channel + return if ch.has_capacity? + @log.debug { "Waiting for global prefetch capacity" } + select + when ch.has_capacity.receive + when @notify_closed.receive + end + true + end + + private def wait_for_single_active_consumer + case @queue.single_active_consumer + when self + @log.debug { "This consumer is the single active consumer" } + when nil + @log.debug { "The queue isn't a single active consumer queue" } + else + @log.debug { "Waiting for this consumer to become the single active consumer" } + loop do + select + when sca = @queue.single_active_consumer_change.receive + if sca == self + break + else + @log.debug { "New single active consumer, but not me" } + end + when @notify_closed.receive + break + end + end + true + end + end + + private def wait_for_priority_consumers + # single active consumer queues can't have priority consumers + if @queue.has_priority_consumers? && @queue.single_active_consumer.nil? + if @queue.consumers.any? { |c| c.priority > @priority && c.accepts? } + @log.debug { "Waiting for higher priority consumers to not have capacity" } + begin + ::Channel.receive_first(@queue.consumers.map(&.has_capacity)) + rescue ::Channel::ClosedError + end + return true + end + end + end + + private def wait_for_queue_ready + if @queue.empty? + @log.debug { "Waiting for queue not to be empty" } + select + when is_empty = @queue.empty_change.receive + @log.debug { "Queue is #{is_empty ? "" : "not"} empty" } + when @notify_closed.receive + end + return true + end + end + + private def wait_for_paused_queue + if @queue.paused? + @log.debug { "Waiting for queue not to be paused" } + select + when is_paused = @queue.paused_change.receive + @log.debug { "Queue is #{is_paused ? "" : "not"} paused" } + when @notify_closed.receive + end + return true + end + end + + private def wait_for_flow + unless @flow + @log.debug { "Waiting for flow" } + is_flow = @flow_change.receive + @log.debug { "Channel flow=#{is_flow}" } + return true + end + end + + # blocks until the consumer can accept more messages + private def wait_for_capacity : Nil + if @prefetch_count > 0 + until @unacked < @prefetch_count + @log.debug { "Waiting for prefetch capacity" } + @has_capacity.receive + end + end + end + + def accepts? : Bool + return false unless @flow + return false if @prefetch_count > 0 && @unacked >= @prefetch_count + return false if @channel.global_prefetch_count > 0 && @channel.consumers.sum(&.unacked) >= @channel.global_prefetch_count + true + end + + getter has_capacity = ::Channel(Bool).new + + private def notify_has_capacity(value) + while @has_capacity.try_send? value + end + end + + def deliver(msg, sp, redelivered = false, recover = false) + unless @no_ack || recover + @unacked += 1 + notify_has_capacity(false) if @unacked == @prefetch_count + end + delivery_tag = @channel.next_delivery_tag(@queue, sp, @no_ack, self) + deliver = AMQP::Frame::Basic::Deliver.new(@channel.id, @tag, + delivery_tag, + redelivered, + msg.exchange_name, msg.routing_key) + @channel.deliver(deliver, msg, redelivered) + end + + def ack(sp) + was_full = @unacked == @prefetch_count + @unacked -= 1 + notify_has_capacity(true) if was_full + end + + def reject(sp, requeue = false) + was_full = @unacked == @prefetch_count + @unacked -= 1 + notify_has_capacity(true) if was_full + end + + def cancel + @channel.send AMQP::Frame::Basic::Cancel.new(@channel.id, @tag, no_wait: true) + @channel.consumers.delete self + close + end + + private def consumer_priority(frame) : Int32 + case prio = frame.arguments["x-priority"] + when Int then prio.to_i32 + else raise LavinMQ::Error::PreconditionFailed.new("x-priority must be an integer") + end + rescue KeyError + 0 + rescue OverflowError + raise LavinMQ::Error::PreconditionFailed.new("x-priority out of bounds, must fit a 32-bit integer") + end + + def unacked_messages + @channel.unacked + end + + def channel_name + @channel.name + end + + def details_tuple + channel_details = @channel.details_tuple + { + queue: { + name: @queue.name, + vhost: @queue.vhost.name, + }, + consumer_tag: @tag, + exclusive: @exclusive, + ack_required: !@no_ack, + prefetch_count: @prefetch_count, + priority: @priority, + channel_details: { + peer_host: channel_details[:connection_details][:peer_host]?, + peer_port: channel_details[:connection_details][:peer_port]?, + connection_name: channel_details[:connection_details][:name], + user: channel_details[:user], + number: channel_details[:number], + name: channel_details[:name], + }, + } + end + + class ClosedError < Error; end + end + end +end diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr new file mode 100644 index 000000000..152ce98f2 --- /dev/null +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -0,0 +1,94 @@ +require "./consumer" +require "../segment_position" + +module LavinMQ + module AMQP + class StreamConsumer < Consumer + include SortableJSON + property offset : Int64 + property segment : UInt32 + property pos : UInt32 + getter requeued = Deque(SegmentPosition).new + + def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) + validate_preconditions(frame) + offset = frame.arguments["x-stream-offset"]? + @offset, @segment, @pos = stream_queue.find_offset(offset) + super + end + + private def validate_preconditions(frame) + if frame.exclusive + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers must not be exclusive") + end + if frame.no_ack + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers must acknowledge messages") + end + if @channel.prefetch_count.zero? + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers must have a prefetch limit") + end + unless @channel.global_prefetch_count.zero? + raise LavinMQ::Error::PreconditionFailed.new("Stream consumers does not support global prefetch limit") + end + if frame.arguments.has_key? "x-priority" + raise LavinMQ::Error::PreconditionFailed.new("x-priority not supported on stream queues") + end + case frame.arguments["x-stream-offset"]? + when Nil, Int, Time, "first", "next", "last" + else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") + end + end + + private def deliver_loop + i = 0 + loop do + wait_for_capacity + loop do + raise ClosedError.new if @closed + next if wait_for_queue_ready + next if wait_for_paused_queue + next if wait_for_flow + break + end + {% unless flag?(:release) %} + @log.debug { "Getting a new message" } + {% end %} + stream_queue.consume_get(self) do |env| + deliver(env.message, env.segment_position, env.redelivered) + end + Fiber.yield if (i &+= 1) % 32768 == 0 + end + rescue ex : ClosedError | Queue::ClosedError | AMQP::Channel::ClosedError | ::Channel::ClosedError + @log.debug { "deliver loop exiting: #{ex.inspect}" } + end + + private def wait_for_queue_ready + if @offset > stream_queue.last_offset && @requeued.empty? + @log.debug { "Waiting for queue not to be empty" } + select + when stream_queue.new_messages.receive + @log.debug { "Queue is not empty" } + when @has_requeued.receive + @log.debug { "Got a requeued message" } + when @notify_closed.receive + end + return true + end + end + + @has_requeued = ::Channel(Nil).new + + private def stream_queue : StreamQueue + @queue.as(StreamQueue) + end + + def reject(sp, requeue : Bool) + super + if requeue + @requeued.push(sp) + @has_requeued.try_send? nil if @requeued.size == 1 + end + end + end + end +end diff --git a/src/lavinmq/client/amqp_connection.cr b/src/lavinmq/client/amqp_connection.cr deleted file mode 100644 index 22de79674..000000000 --- a/src/lavinmq/client/amqp_connection.cr +++ /dev/null @@ -1,186 +0,0 @@ -require "log" -require "../version" - -module LavinMQ - class AMQPConnection - Log = ::Log.for "AMQPConnection" - - def self.start(socket, connection_info, vhosts, users) : Client? - remote_address = connection_info.src - socket.read_timeout = 15.seconds - if confirm_header(socket) - if start_ok = start(socket) - if user = authenticate(socket, remote_address, users, start_ok) - if tune_ok = tune(socket) - if vhost = open(socket, vhosts, user) - socket.read_timeout = heartbeat_timeout(tune_ok) - return Client.new(socket, connection_info, vhost, user, tune_ok, start_ok) - end - end - end - end - end - rescue ex : IO::TimeoutError | IO::Error | OpenSSL::SSL::Error | AMQP::Error::FrameDecode - Log.warn { "#{ex} when #{remote_address} tried to establish connection" } - nil - rescue ex - Log.error(exception: ex) { "Error while #{remote_address} tried to establish connection" } - nil - end - - private def self.heartbeat_timeout(tune_ok) - if tune_ok.heartbeat > 0 - (tune_ok.heartbeat / 2).seconds - end - end - - def self.confirm_header(socket) : Bool - proto = uninitialized UInt8[8] - count = socket.read(proto.to_slice) - if count.zero? # EOF, socket closed by peer - false - elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 - socket.write AMQP::PROTOCOL_START_0_9_1.to_slice - socket.flush - Log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } - false - else - true - end - end - - SERVER_PROPERTIES = AMQP::Table.new({ - "product": "LavinMQ", - "platform": "Crystal #{Crystal::VERSION}", - "version": LavinMQ::VERSION, - "capabilities": { - "publisher_confirms": true, - "exchange_exchange_bindings": true, - "basic.nack": true, - "consumer_cancel_notify": true, - "connection.blocked": true, - "consumer_priorities": true, - "authentication_failure_close": true, - "per_consumer_qos": true, - "direct_reply_to": true, - }, - }) - - def self.start(socket) - start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES) - socket.write_bytes start, ::IO::ByteFormat::NetworkEndian - socket.flush - start_ok = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::StartOk) } - if start_ok.bytesize > 4096 - Log.warn { "StartOk frame was #{start_ok.bytesize} bytes, max allowed is 4096 bytes" } - return - end - start_ok - end - - def self.credentials(start_ok) - case start_ok.mechanism - when "PLAIN" - resp = start_ok.response - if i = resp.index('\u0000', 1) - {resp[1...i], resp[(i + 1)..-1]} - else - raise "Invalid authentication response" - end - when "AMQPLAIN" - io = ::IO::Memory.new(start_ok.response) - tbl = AMQP::Table.from_io(io, ::IO::ByteFormat::NetworkEndian, io.bytesize.to_u32) - {tbl["LOGIN"].as(String), tbl["PASSWORD"].as(String)} - else raise "Unsupported authentication mechanism: #{start_ok.mechanism}" - end - end - - def self.authenticate(socket, remote_address, users, start_ok) - username, password = credentials(start_ok) - user = users[username]? - return user if user && user.password && user.password.not_nil!.verify(password) && - guest_only_loopback?(remote_address, user) - - if user.nil? - Log.warn { "User \"#{username}\" not found" } - else - Log.warn { "Authentication failure for user \"#{username}\"" } - end - props = start_ok.client_properties - if capabilities = props["capabilities"]?.try &.as?(AMQP::Table) - if capabilities["authentication_failure_close"]?.try &.as?(Bool) - socket.write_bytes AMQP::Frame::Connection::Close.new(403_u16, "ACCESS_REFUSED", - start_ok.class_id, - start_ok.method_id), IO::ByteFormat::NetworkEndian - socket.flush - end - end - nil - end - - def self.tune(socket) - frame_max = socket.is_a?(WebSocketIO) ? 4096_u32 : Config.instance.frame_max - socket.write_bytes AMQP::Frame::Connection::Tune.new( - channel_max: Config.instance.channel_max, - frame_max: frame_max, - heartbeat: Config.instance.heartbeat), IO::ByteFormat::NetworkEndian - socket.flush - tune_ok = AMQP::Frame.from_io(socket) do |frame| - case frame - when AMQP::Frame::Connection::TuneOk - if frame.frame_max < 4096 - Log.warn { "Suggested Frame max (#{frame.frame_max}) too low, closing connection" } - return - end - frame - else - Log.warn { "Expected TuneOk Frame got #{frame.inspect}" } - return - end - end - if tune_ok.frame_max < 4096 - Log.warn { "Suggested Frame max (#{tune_ok.frame_max}) too low, closing connection" } - return - end - tune_ok - end - - def self.open(socket, vhosts, user) - open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) } - vhost_name = open.vhost.empty? ? "/" : open.vhost - if vhost = vhosts[vhost_name]? - if user.permissions[vhost_name]? - if vhost.max_connections.try { |max| vhost.connections.size >= max } - Log.warn { "Max connections (#{vhost.max_connections}) reached for vhost #{vhost_name}" } - reply_text = "NOT_ALLOWED - access to vhost '#{vhost_name}' refused: connection limit (#{vhost.max_connections}) is reached" - socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, - open.class_id, open.method_id), IO::ByteFormat::NetworkEndian - socket.flush - return - end - socket.write_bytes AMQP::Frame::Connection::OpenOk.new, IO::ByteFormat::NetworkEndian - socket.flush - return vhost - else - Log.warn { "Access denied for user \"#{user.name}\" to vhost \"#{vhost_name}\"" } - reply_text = "NOT_ALLOWED - '#{user.name}' doesn't have access to '#{vhost.name}'" - socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, reply_text, - open.class_id, open.method_id), IO::ByteFormat::NetworkEndian - socket.flush - end - else - Log.warn { "VHost \"#{vhost_name}\" not found" } - socket.write_bytes AMQP::Frame::Connection::Close.new(530_u16, "NOT_ALLOWED - vhost not found", - open.class_id, open.method_id), IO::ByteFormat::NetworkEndian - socket.flush - end - nil - end - - private def self.guest_only_loopback?(remote_address, user) : Bool - return true unless user.name == "guest" - return true unless Config.instance.guest_only_loopback? - remote_address.loopback? - end - end -end diff --git a/src/lavinmq/client/channel.cr b/src/lavinmq/client/channel.cr index 2d74c0112..71efd5b06 100644 --- a/src/lavinmq/client/channel.cr +++ b/src/lavinmq/client/channel.cr @@ -1,801 +1,10 @@ -require "./channel/consumer" -require "./channel/stream_consumer" -require "../logger" -require "../queue" -require "../exchange" -require "../amqp" -require "../stats" require "../sortable_json" -require "../error" +require "./channel/consumer" module LavinMQ - class Client - class Channel - include Stats + abstract class Client + abstract class Channel include SortableJSON - - getter id, name - property? running = true - getter? flow = true - getter consumers = Array(Consumer).new - getter prefetch_count = 0_u16 - getter global_prefetch_count = 0_u16 - getter has_capacity = ::Channel(Nil).new - getter unacked = Deque(Unack).new - @confirm = false - @confirm_total = 0_u64 - @next_publish_mandatory = false - @next_publish_immediate = false - @next_publish_exchange_name : String? - @next_publish_routing_key : String? - @next_msg_size = 0_u64 - @next_msg_props : AMQP::Properties? - @delivery_tag = 0_u64 - @unack_lock = Mutex.new(:checked) - @next_msg_body_file : File? - @direct_reply_consumer : String? - @tx = false - @next_msg_body_tmp = IO::Memory.new - - rate_stats({"ack", "get", "publish", "deliver", "redeliver", "reject", "confirm", "return_unroutable"}) - - Log = ::Log.for("channel") - - def initialize(@client : Client, @id : UInt16) - @metadata = ::Log::Metadata.new(nil, {client: @client.remote_address.to_s, channel: @id.to_i}) - @name = "#{@client.channel_name_prefix}[#{@id}]" - @log = Logger.new(Log, @metadata) - end - - record Unack, - tag : UInt64, - queue : Queue, - sp : SegmentPosition, - consumer : Consumer?, - delivered_at : Time::Span - - def details_tuple - { - number: @id, - name: @name, - vhost: @client.vhost.name, - user: @client.user.try(&.name), - consumer_count: @consumers.size, - prefetch_count: @prefetch_count, - global_prefetch_count: @global_prefetch_count, - confirm: @confirm, - transactional: false, - messages_unacknowledged: @unacked.size, - connection_details: @client.connection_details, - state: state, - message_stats: stats_details, - } - end - - def flow(active : Bool) - @flow = active - @consumers.each &.flow(active) - send AMQP::Frame::Channel::FlowOk.new(@id, active) - end - - def state - !@running ? "closed" : @flow ? "running" : "flow" - end - - def send(frame) - unless @running - @log.debug { "Channel is closed so is not sending #{frame.inspect}" } - return false - end - @client.send frame, true - end - - def confirm_select(frame) - if @tx - @client.send_precondition_failed(frame, "Channel already in transactional mode") - return - end - @confirm = true - unless frame.no_wait - send AMQP::Frame::Confirm::SelectOk.new(frame.channel) - end - end - - def start_publish(frame) - unless server_flow? - @client.send_precondition_failed(frame, "Server low on disk space") - return - end - raise Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name - if ex = @client.vhost.exchanges[frame.exchange]? - if !ex.internal? - @next_publish_exchange_name = frame.exchange - @next_publish_routing_key = frame.routing_key - @next_publish_mandatory = frame.mandatory - @next_publish_immediate = frame.immediate - else - @client.send_access_refused(frame, "Exchange '#{frame.exchange}' in vhost '#{@client.vhost.name}' is internal") - end - else - @client.send_not_found(frame, "No exchange '#{frame.exchange}' in vhost '#{@client.vhost.name}'") - end - end - - private def direct_reply_to?(str) : Bool - {"amq.rabbitmq.reply-to", "amq.direct.reply-to"}.includes? str - end - - def next_msg_headers(frame) - raise Error::UnexpectedFrame.new(frame) if @next_publish_exchange_name.nil? - raise Error::UnexpectedFrame.new(frame) if @next_msg_props - raise Error::UnexpectedFrame.new(frame) if frame.class_id != 60 - valid_expiration?(frame) || return - if direct_reply_to?(frame.properties.reply_to) - if drc = @direct_reply_consumer - frame.properties.reply_to = "amq.direct.reply-to.#{drc}" - else - @client.send_precondition_failed(frame, "Direct reply consumer does not exist") - return - end - end - if frame.body_size > Config.instance.max_message_size - error = "message size #{frame.body_size} larger than max size #{Config.instance.max_message_size}" - @client.send_precondition_failed(frame, error) - @log.warn { "Message size exceeded, #{frame.body_size}/#{Config.instance.max_message_size}" } - return - end - @next_msg_size = frame.body_size - @next_msg_props = frame.properties - finish_publish(@next_msg_body_tmp) if frame.body_size.zero? - end - - @next_msg_body_file_pos = 0 - - def add_content(frame) - if @next_publish_exchange_name.nil? || @next_msg_props.nil? - frame.body.skip(frame.body_size) - raise Error::UnexpectedFrame.new(frame) - end - if @tx # in transaction mode, copy all bodies to the tmp file serially - copied = IO.copy(frame.body, next_msg_body_file, frame.body_size) - if copied != frame.body_size - raise IO::Error.new("Could only copy #{copied} of #{frame.body_size} bytes") - end - if (@next_msg_body_file_pos += copied) == @next_msg_size - # as the body_io won't be read until tx_commit there's no need to rewind - # bodies can be appended sequentially to the tmp file - finish_publish(next_msg_body_file) - @next_msg_body_file_pos = 0 - end - elsif frame.body_size == @next_msg_size - copied = IO.copy(frame.body, @next_msg_body_tmp, frame.body_size) - if copied != frame.body_size - raise IO::Error.new("Could only copy #{copied} of #{frame.body_size} bytes") - end - @next_msg_body_tmp.rewind - begin - finish_publish(@next_msg_body_tmp) - ensure - @next_msg_body_tmp.clear - end - else - copied = IO.copy(frame.body, next_msg_body_file, frame.body_size) - if copied != frame.body_size - raise IO::Error.new("Could only copy #{copied} of #{frame.body_size} bytes") - end - if next_msg_body_file.pos == @next_msg_size - next_msg_body_file.rewind - begin - finish_publish(next_msg_body_file) - ensure - next_msg_body_file.rewind - end - end - end - end - - private def valid_expiration?(frame) : Bool - if exp = frame.properties.expiration - if i = exp.to_i? - if i < 0 - @client.send_precondition_failed(frame, "Negative expiration not allowed") - return false - end - else - @client.send_precondition_failed(frame, "Expiration not a number") - return false - end - end - true - end - - private def server_flow? - @client.vhost.flow? - end - - private def finish_publish(body_io) - @publish_count += 1 - @client.vhost.event_tick(EventType::ClientPublish) - props = @next_msg_props.not_nil! - props.timestamp = RoughTime.utc if props.timestamp.nil? && Config.instance.set_timestamp? - msg = Message.new(RoughTime.unix_ms, - @next_publish_exchange_name.not_nil!, - @next_publish_routing_key.not_nil!, - props, - @next_msg_size, - body_io) - direct_reply?(msg) || publish_and_return(msg) - ensure - @next_msg_size = 0_u64 - @next_msg_props = nil - @next_publish_exchange_name = @next_publish_routing_key = nil - @next_publish_mandatory = @next_publish_immediate = false - end - - @visited = Set(Exchange).new - @found_queues = Set(Queue).new - - record TxMessage, message : Message, mandatory : Bool, immediate : Bool - @tx_publishes = Array(TxMessage).new - - private def publish_and_return(msg) - validate_user_id(msg.properties.user_id) - if @tx - @tx_publishes.push TxMessage.new(msg, @next_publish_mandatory, @next_publish_immediate) - return - end - - confirm do - ok = @client.vhost.publish msg, @next_publish_immediate, @visited, @found_queues - basic_return(msg, @next_publish_mandatory, @next_publish_immediate) unless ok - rescue e : Error::PreconditionFailed - msg.body_io.skip(msg.bodysize) - send AMQP::Frame::Channel::Close.new(@id, 406_u16, "PRECONDITION_FAILED - #{e.message}", 60_u16, 40_u16) - end - rescue Queue::RejectOverFlow - # nack but then do nothing - end - - private def validate_user_id(user_id) - current_user = @client.user - if user_id && user_id != current_user.name && !current_user.can_impersonate? - text = "Message's user_id property '#{user_id}' doesn't match actual user '#{current_user.name}'" - @log.error { text } - raise Error::PreconditionFailed.new(text) - end - end - - private def confirm(&) - if @confirm - msgid = @confirm_total &+= 1 - begin - yield - confirm_ack(msgid) - rescue ex - confirm_nack(msgid) - raise ex - end - else - yield - end - end - - private def confirm_ack(msgid, multiple = false) - @client.vhost.event_tick(EventType::ClientPublishConfirm) - @confirm_count += 1 # Stats - send AMQP::Frame::Basic::Ack.new(@id, msgid, multiple) - end - - private def confirm_nack(msgid, multiple = false) - @client.vhost.event_tick(EventType::ClientPublishConfirm) - @confirm_count += 1 # Stats - send AMQP::Frame::Basic::Nack.new(@id, msgid, multiple, requeue: false) - end - - private def direct_reply?(msg) : Bool - return false unless msg.routing_key.starts_with? "amq.direct.reply-to." - consumer_tag = msg.routing_key[20..] - if ch = @client.vhost.direct_reply_consumers[consumer_tag]? - confirm do - deliver = AMQP::Frame::Basic::Deliver.new(ch.id, consumer_tag, - 1_u64, false, - msg.exchange_name, - msg.routing_key) - ch.deliver(deliver, msg) - end - true - else - false - end - end - - private def basic_return(msg : Message, mandatory : Bool, immediate : Bool) - @return_unroutable_count += 1 - if immediate - retrn = AMQP::Frame::Basic::Return.new(@id, 313_u16, "NO_CONSUMERS", msg.exchange_name, msg.routing_key) - deliver(retrn, msg) - msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind - elsif mandatory - retrn = AMQP::Frame::Basic::Return.new(@id, 312_u16, "NO_ROUTE", msg.exchange_name, msg.routing_key) - deliver(retrn, msg) - msg.body_io.seek(-msg.bodysize.to_i64, IO::Seek::Current) # rewind - end - end - - def deliver(frame, msg, redelivered = false) : Nil - raise ClosedError.new("Channel is closed") unless @running - @client.deliver(frame, msg) - if redelivered - @redeliver_count += 1 - @client.vhost.event_tick(EventType::ClientRedeliver) - else - @deliver_count += 1 - @client.vhost.event_tick(EventType::ClientDeliver) - end - end - - def consume(frame) - if frame.consumer_tag.empty? - frame.consumer_tag = "amq.ctag-#{Random::Secure.urlsafe_base64(24)}" - end - if direct_reply_to?(frame.queue) - unless frame.no_ack - @client.send_precondition_failed(frame, "Direct replys must be consumed in no-ack mode") - return - end - @log.debug { "Saving direct reply consumer #{frame.consumer_tag}" } - @direct_reply_consumer = frame.consumer_tag - @client.vhost.direct_reply_consumers[frame.consumer_tag] = self - unless frame.no_wait - send AMQP::Frame::Basic::ConsumeOk.new(frame.channel, frame.consumer_tag) - end - elsif q = @client.vhost.queues[frame.queue]? - if @client.queue_exclusive_to_other_client?(q) - @client.send_resource_locked(frame, "Exclusive queue") - return - end - if q.has_exclusive_consumer? - @client.send_access_refused(frame, "Queue '#{frame.queue}' in vhost '#{@client.vhost.name}' in exclusive use") - return - end - c = if q.is_a? StreamQueue - StreamConsumer.new(self, q, frame) - else - Consumer.new(self, q, frame) - end - @consumers.push(c) - q.add_consumer(c) - unless frame.no_wait - send AMQP::Frame::Basic::ConsumeOk.new(frame.channel, frame.consumer_tag) - end - else - @client.send_not_found(frame, "Queue '#{frame.queue}' not declared") - end - Fiber.yield # Notify :add_consumer observers - end - - def basic_get(frame) - if q = @client.vhost.queues.fetch(frame.queue, nil) - if @client.queue_exclusive_to_other_client?(q) - @client.send_resource_locked(frame, "Exclusive queue") - elsif q.has_exclusive_consumer? - @client.send_access_refused(frame, "Queue '#{frame.queue}' in vhost '#{@client.vhost.name}' in exclusive use") - elsif q.is_a? StreamQueue - @client.send_not_implemented(frame, "Stream queues does not support basic_get") - else - @get_count += 1 - @client.vhost.event_tick(EventType::ClientGet) - ok = q.basic_get(frame.no_ack) do |env| - delivery_tag = next_delivery_tag(q, env.segment_position, frame.no_ack, nil) - unless frame.no_ack # track unacked messages - q.basic_get_unacked << UnackedMessage.new(self, delivery_tag, RoughTime.monotonic) - end - get_ok = AMQP::Frame::Basic::GetOk.new(frame.channel, delivery_tag, - env.redelivered, env.message.exchange_name, - env.message.routing_key, q.message_count) - deliver(get_ok, env.message, env.redelivered) - end - send AMQP::Frame::Basic::GetEmpty.new(frame.channel) unless ok - end - else - @client.send_not_found(frame, "No queue '#{frame.queue}' in vhost '#{@client.vhost.name}'") - end - end - - private def delete_unacked(delivery_tag) : Unack? - found = nil - @unack_lock.synchronize do - # @unacked is always sorted so can do a binary search - # optimization for acking first unacked - if @unacked[0]?.try(&.tag) == delivery_tag - # @log.debug { "Unacked found tag:#{delivery_tag} at front" } - found = @unacked.shift - elsif idx = @unacked.bsearch_index { |unack, _| unack.tag >= delivery_tag } - return nil unless @unacked[idx].tag == delivery_tag - # @log.debug { "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}" } - found = @unacked.delete_at(idx) - end - end - notify_has_capacity(1) if found - found - end - - private def delete_multiple_unacked(delivery_tag, & : Unack -> Nil) - count = 0 - @unack_lock.synchronize do - if delivery_tag.zero? - until @unacked.empty? - yield @unacked.shift - count += 1 - end - else - idx = @unacked.bsearch_index { |unack, _| unack.tag >= delivery_tag } - return nil unless idx - return nil unless @unacked[idx].tag == delivery_tag - # @log.debug { "Unacked bsearch found tag:#{delivery_tag} at index:#{idx}" } - (idx + 1).times do - yield @unacked.shift - count += 1 - end - end - end - notify_has_capacity(count) - end - - def unacked_count - @unacked.size - end - - record TxAck, delivery_tag : UInt64, multiple : Bool, negative : Bool, requeue : Bool - @tx_acks = Array(TxAck).new - - def basic_ack(frame) - if @tx - @unack_lock.synchronize do - if frame.delivery_tag.zero? && frame.multiple # all msgs so far - @tx_acks.push(TxAck.new @unacked.last.tag, frame.multiple, false, false) - return - elsif @unacked.bsearch { |unack| unack.tag >= frame.delivery_tag }.try &.tag == frame.delivery_tag - check_double_ack!(frame.delivery_tag) - @tx_acks.push(TxAck.new frame.delivery_tag, frame.multiple, false, false) - return - end - end - @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) - return - end - - found = false - if frame.multiple - found = true if frame.delivery_tag.zero? - delete_multiple_unacked(frame.delivery_tag) do |unack| - found = true - do_ack(unack) - end - elsif unack = delete_unacked(frame.delivery_tag) - found = true - do_ack(unack) - end - unless found - @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) - end - rescue DoubleAck - @client.send_precondition_failed(frame, "Delivery tag already acked") - end - - private def do_ack(unack) - if c = unack.consumer - c.ack(unack.sp) - end - unack.queue.ack(unack.sp) - unack.queue.basic_get_unacked.reject! { |u| u.channel == self && u.delivery_tag == unack.tag } - @client.vhost.event_tick(EventType::ClientAck) - @ack_count += 1 - end - - def basic_reject(frame) - if @tx - @unack_lock.synchronize do - if @unacked.bsearch { |unack| unack.tag >= frame.delivery_tag }.try &.tag == frame.delivery_tag - check_double_ack!(frame.delivery_tag) - @tx_acks.push(TxAck.new frame.delivery_tag, false, true, frame.requeue) - return - end - end - @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) - return - end - - @log.debug { "Rejecting #{frame.inspect}" } - if unack = delete_unacked(frame.delivery_tag) - do_reject(frame.requeue, unack) - else - @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) - end - rescue DoubleAck - @client.send_precondition_failed(frame, "Delivery tag already acked") - end - - def basic_nack(frame) - if @tx - @unack_lock.synchronize do - if frame.delivery_tag.zero? && frame.multiple # all msgs so far - @tx_acks.push(TxAck.new @unacked.last.tag, true, true, frame.requeue) - return - elsif @unacked.bsearch { |unack| unack.tag >= frame.delivery_tag }.try &.tag == frame.delivery_tag - check_double_ack!(frame.delivery_tag) - @tx_acks.push(TxAck.new frame.delivery_tag, frame.multiple, true, frame.requeue) - return - end - end - @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) - return - end - - found = false - if frame.multiple - delete_multiple_unacked(frame.delivery_tag) do |unack| - found = true - do_reject(frame.requeue, unack) - end - elsif unack = delete_unacked(frame.delivery_tag) - found = true - do_reject(frame.requeue, unack) - end - unless found - @client.send_precondition_failed(frame, unknown_tag(frame.delivery_tag)) - end - rescue DoubleAck - @client.send_precondition_failed(frame, "Delivery tag already acked") - end - - private class DoubleAck < Error; end - - private def check_double_ack!(delivery_tag) - if @tx_acks.any? { |tx_ack| tx_ack.delivery_tag == delivery_tag } - raise DoubleAck.new - end - end - - private def unknown_tag(delivery_tag) - # Lower case u important for bunny on_error callback - "unknown delivery tag #{delivery_tag}" - end - - private def do_reject(requeue, unack) - if c = unack.consumer - c.reject(unack.sp, requeue) - end - unack.queue.reject(unack.sp, requeue) - unack.queue.basic_get_unacked.reject! { |u| u.channel == self && u.delivery_tag == unack.tag } - @reject_count += 1 - @client.vhost.event_tick(EventType::ClientReject) - end - - def basic_qos(frame) : Nil - @client.send_not_implemented(frame) if frame.prefetch_size != 0 - if frame.global - @global_prefetch_count = frame.prefetch_count - if frame.prefetch_count.zero? - while @has_capacity.try_send?(nil) - end - else - unacked_by_consumers = @unack_lock.synchronize { @unacked.count(&.consumer) } - notify_has_capacity(frame.prefetch_count.to_i - unacked_by_consumers) - end - else - @prefetch_count = frame.prefetch_count - @consumers.each(&.prefetch_count = frame.prefetch_count) - end - send AMQP::Frame::Basic::QosOk.new(frame.channel) - end - - def basic_recover(frame) : Nil - @unack_lock.synchronize do - if frame.requeue - @unacked.each do |unack| - next if delivery_tag_is_in_tx?(unack.tag) - if consumer = unack.consumer - consumer.reject(unack.sp, requeue: true) - end - unack.queue.reject(unack.sp, requeue: true) - end - @unacked.clear - notify_has_capacity - else # redeliver to the original recipient - @unacked.reject! do |unack| - next if delivery_tag_is_in_tx?(unack.tag) - if (consumer = unack.consumer) && !consumer.closed? - env = unack.queue.read(unack.sp) - consumer.deliver(env.message, env.segment_position, true, recover: true) - false - else - unack.queue.reject(unack.sp, requeue: true) - true - end - end - end - end - send AMQP::Frame::Basic::RecoverOk.new(frame.channel) - end - - private def delivery_tag_is_in_tx?(delivery_tag) : Bool - if @tx - @tx_acks.any? do |tx_ack| - (tx_ack.delivery_tag > delivery_tag && tx_ack.multiple) || tx_ack.delivery_tag == delivery_tag - end - else - false - end - end - - def close - @running = false - @consumers.each &.close - @consumers.clear - if drc = @direct_reply_consumer - @client.vhost.direct_reply_consumers.delete(drc) - end - @unack_lock.synchronize do - @unacked.each do |unack| - @log.debug { "Requeing unacked msg #{unack.sp}" } - unack.queue.reject(unack.sp, true) - unack.queue.basic_get_unacked.reject! { |u| u.channel == self && u.delivery_tag == unack.tag } - end - @unacked.clear - end - @has_capacity.close - @next_msg_body_file.try &.close - @client.vhost.event_tick(EventType::ChannelClosed) - @log.debug { "Closed" } - end - - protected def next_delivery_tag(queue : Queue, sp, no_ack, consumer) : UInt64 - @unack_lock.synchronize do - tag = @delivery_tag &+= 1 - @unacked.push Unack.new(tag, queue, sp, consumer, RoughTime.monotonic) unless no_ack - tag - end - end - - # Iterate over all unacked messages and see if any has been unacked longer than the queue's consumer timeout - def check_consumer_timeout - @unack_lock.synchronize do - queues = Set(Queue).new # only check first delivered message per queue - @unacked.each do |unack| - if queues.add? unack.queue - if timeout = unack.queue.consumer_timeout - unacked_ms = RoughTime.monotonic - unack.delivered_at - if unacked_ms > timeout.milliseconds - send AMQP::Frame::Channel::Close.new(@id, 406_u16, "PRECONDITION_FAILED - consumer timeout", 60_u16, 20_u16) - break - end - end - end - end - end - end - - def has_capacity? : Bool - return true if @global_prefetch_count.zero? - prefetch_limit = @global_prefetch_count - @unack_lock.synchronize do - count = 0 - @unacked.each do |unack| - next if unack.consumer.nil? # only count consumer unacked against limit - count += 1 - return false if count >= prefetch_limit - end - true - end - end - - private def notify_has_capacity(capacity = Int32::MAX) - return if @global_prefetch_count.zero? - return if capacity.negative? - capacity.times do - @has_capacity.try_send?(nil) || break - end - end - - def cancel_consumer(frame) - @log.debug { "Cancelling consumer '#{frame.consumer_tag}'" } - if idx = @consumers.index { |cons| cons.tag == frame.consumer_tag } - c = @consumers.delete_at idx - c.close - elsif @direct_reply_consumer == frame.consumer_tag - @direct_reply_consumer = nil - @client.vhost.direct_reply_consumers.delete(frame.consumer_tag) - end - unless frame.no_wait - send AMQP::Frame::Basic::CancelOk.new(frame.channel, frame.consumer_tag) - end - end - - private def next_msg_body_file - @next_msg_body_file ||= - begin - File.tempfile("channel.", nil, dir: @client.vhost.data_dir).tap do |f| - f.sync = true - f.read_buffering = false - f.delete - end - end - end - - def tx_select(frame) - if @confirm - @client.send_precondition_failed(frame, "Channel already in confirm mode") - return - end - @tx = true - send AMQP::Frame::Tx::SelectOk.new(frame.channel) - end - - def tx_commit(frame) - return @client.send_precondition_failed(frame, "Not in transaction mode") unless @tx - process_tx_acks - process_tx_publishes - @client.vhost.sync - send AMQP::Frame::Tx::CommitOk.new(frame.channel) - end - - private def process_tx_publishes - next_msg_body_file.rewind - @tx_publishes.each do |tx_msg| - tx_msg.message.timestamp = RoughTime.unix_ms - ok = @client.vhost.publish(tx_msg.message, tx_msg.immediate, @visited, @found_queues) - basic_return(tx_msg.message, tx_msg.mandatory, tx_msg.immediate) unless ok - # skip to next msg body in the next_msg_body_file - tx_msg.message.body_io.seek(tx_msg.message.bodysize, IO::Seek::Current) - end - @tx_publishes.clear - ensure - next_msg_body_file.rewind - end - - private def process_tx_acks - count = 0 - @unack_lock.synchronize do - @tx_acks.each do |tx_ack| - if idx = @unacked.bsearch_index { |u, _| u.tag >= tx_ack.delivery_tag } - raise "BUG: Delivery tag not found" unless @unacked[idx].tag == tx_ack.delivery_tag - @log.debug { "Unacked bsearch found tag:#{tx_ack.delivery_tag} at index:#{idx}" } - if tx_ack.multiple - (idx + 1).times do - unack = @unacked.shift - if tx_ack.negative - do_reject(tx_ack.requeue, unack) - else - do_ack(unack) - end - count += 1 - end - else - unack = @unacked.delete_at(idx) - if tx_ack.negative - do_reject(tx_ack.requeue, unack) - else - do_ack(unack) - end - count += 1 - end - end - end - @tx_acks.clear - end - notify_has_capacity(count) - end - - def tx_rollback(frame) - return @client.send_precondition_failed(frame, "Not in transaction mode") unless @tx - @tx_publishes.clear - @tx_acks.clear - next_msg_body_file.rewind - send AMQP::Frame::Tx::RollbackOk.new(frame.channel) - end - - class ClosedError < Error; end end end end diff --git a/src/lavinmq/client/channel/consumer.cr b/src/lavinmq/client/channel/consumer.cr index 0840a2885..0192a7058 100644 --- a/src/lavinmq/client/channel/consumer.cr +++ b/src/lavinmq/client/channel/consumer.cr @@ -1,265 +1,10 @@ -require "log" -require "../../sortable_json" -require "../../error" require "../../logger" module LavinMQ - class Client - class Channel - class Consumer + abstract class Client + abstract class Channel + abstract class Consumer include SortableJSON - Log = ::Log.for("consumer") - getter tag : String - getter priority : Int32 - getter? exclusive : Bool - getter? no_ack : Bool - getter channel, queue - getter prefetch_count = 0u16 - getter unacked = 0_u32 - getter? closed = false - @flow : Bool - @metadata : ::Log::Metadata - - def initialize(@channel : Client::Channel, @queue : Queue, frame : AMQP::Frame::Basic::Consume) - @tag = frame.consumer_tag - @no_ack = frame.no_ack - @exclusive = frame.exclusive - @priority = consumer_priority(frame) # Must be before ConsumeOk, can close channel - @prefetch_count = @channel.prefetch_count - @flow = @channel.flow? - @metadata = @channel.@metadata.extend({consumer: @tag}) - @log = Logger.new(Log, @metadata) - spawn deliver_loop, name: "Consumer deliver loop", same_thread: true - end - - def close - @closed = true - @queue.rm_consumer(self) - @notify_closed.close - @has_capacity.close - @flow_change.close - end - - @notify_closed = ::Channel(Nil).new - @flow_change = ::Channel(Bool).new - - def flow(active : Bool) - @flow = active - @flow_change.try_send? active - end - - def prefetch_count=(prefetch_count : UInt16) - @prefetch_count = prefetch_count - notify_has_capacity(@prefetch_count > @unacked) - end - - private def deliver_loop - wait_for_single_active_consumer - queue = @queue - i = 0 - loop do - wait_for_capacity - loop do - raise ClosedError.new if @closed - next if wait_for_global_capacity - next if wait_for_priority_consumers - next if wait_for_queue_ready - next if wait_for_paused_queue - next if wait_for_flow - break - end - {% unless flag?(:release) %} - @log.debug { "Getting a new message" } - {% end %} - queue.consume_get(self) do |env| - deliver(env.message, env.segment_position, env.redelivered) - end - Fiber.yield if (i &+= 1) % 32768 == 0 - end - rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError - @log.debug { "deliver loop exiting: #{ex.inspect}" } - end - - private def wait_for_global_capacity - ch = @channel - return if ch.has_capacity? - @log.debug { "Waiting for global prefetch capacity" } - select - when ch.has_capacity.receive - when @notify_closed.receive - end - true - end - - private def wait_for_single_active_consumer - case @queue.single_active_consumer - when self - @log.debug { "This consumer is the single active consumer" } - when nil - @log.debug { "The queue isn't a single active consumer queue" } - else - @log.debug { "Waiting for this consumer to become the single active consumer" } - loop do - select - when sca = @queue.single_active_consumer_change.receive - if sca == self - break - else - @log.debug { "New single active consumer, but not me" } - end - when @notify_closed.receive - break - end - end - true - end - end - - private def wait_for_priority_consumers - # single active consumer queues can't have priority consumers - if @queue.has_priority_consumers? && @queue.single_active_consumer.nil? - if @queue.consumers.any? { |c| c.priority > @priority && c.accepts? } - @log.debug { "Waiting for higher priority consumers to not have capacity" } - begin - ::Channel.receive_first(@queue.consumers.map(&.has_capacity)) - rescue ::Channel::ClosedError - end - return true - end - end - end - - private def wait_for_queue_ready - if @queue.empty? - @log.debug { "Waiting for queue not to be empty" } - select - when is_empty = @queue.empty_change.receive - @log.debug { "Queue is #{is_empty ? "" : "not"} empty" } - when @notify_closed.receive - end - return true - end - end - - private def wait_for_paused_queue - if @queue.paused? - @log.debug { "Waiting for queue not to be paused" } - select - when is_paused = @queue.paused_change.receive - @log.debug { "Queue is #{is_paused ? "" : "not"} paused" } - when @notify_closed.receive - end - return true - end - end - - private def wait_for_flow - unless @flow - @log.debug { "Waiting for flow" } - is_flow = @flow_change.receive - @log.debug { "Channel flow=#{is_flow}" } - return true - end - end - - # blocks until the consumer can accept more messages - private def wait_for_capacity : Nil - if @prefetch_count > 0 - until @unacked < @prefetch_count - @log.debug { "Waiting for prefetch capacity" } - @has_capacity.receive - end - end - end - - def accepts? : Bool - return false unless @flow - return false if @prefetch_count > 0 && @unacked >= @prefetch_count - return false if @channel.global_prefetch_count > 0 && @channel.consumers.sum(&.unacked) >= @channel.global_prefetch_count - true - end - - getter has_capacity = ::Channel(Bool).new - - private def notify_has_capacity(value) - while @has_capacity.try_send? value - end - end - - def deliver(msg, sp, redelivered = false, recover = false) - unless @no_ack || recover - @unacked += 1 - notify_has_capacity(false) if @unacked == @prefetch_count - end - delivery_tag = @channel.next_delivery_tag(@queue, sp, @no_ack, self) - deliver = AMQP::Frame::Basic::Deliver.new(@channel.id, @tag, - delivery_tag, - redelivered, - msg.exchange_name, msg.routing_key) - @channel.deliver(deliver, msg, redelivered) - end - - def ack(sp) - was_full = @unacked == @prefetch_count - @unacked -= 1 - notify_has_capacity(true) if was_full - end - - def reject(sp, requeue = false) - was_full = @unacked == @prefetch_count - @unacked -= 1 - notify_has_capacity(true) if was_full - end - - def cancel - @channel.send AMQP::Frame::Basic::Cancel.new(@channel.id, @tag, no_wait: true) - @channel.consumers.delete self - close - end - - private def consumer_priority(frame) : Int32 - case prio = frame.arguments["x-priority"] - when Int then prio.to_i32 - else raise Error::PreconditionFailed.new("x-priority must be an integer") - end - rescue KeyError - 0 - rescue OverflowError - raise Error::PreconditionFailed.new("x-priority out of bounds, must fit a 32-bit integer") - end - - def unacked_messages - @channel.unacked - end - - def channel_name - @channel.name - end - - def details_tuple - channel_details = @channel.details_tuple - { - queue: { - name: @queue.name, - vhost: @queue.vhost.name, - }, - consumer_tag: @tag, - exclusive: @exclusive, - ack_required: !@no_ack, - prefetch_count: @prefetch_count, - priority: @priority, - channel_details: { - peer_host: channel_details[:connection_details][:peer_host]?, - peer_port: channel_details[:connection_details][:peer_port]?, - connection_name: channel_details[:connection_details][:name], - user: channel_details[:user], - number: channel_details[:number], - name: channel_details[:name], - }, - } - end - - class ClosedError < Error; end end end end diff --git a/src/lavinmq/client/channel/stream_consumer.cr b/src/lavinmq/client/channel/stream_consumer.cr deleted file mode 100644 index 33a7a04ad..000000000 --- a/src/lavinmq/client/channel/stream_consumer.cr +++ /dev/null @@ -1,95 +0,0 @@ -require "./consumer" -require "../../segment_position" - -module LavinMQ - class Client - class Channel - class StreamConsumer < LavinMQ::Client::Channel::Consumer - property offset : Int64 - property segment : UInt32 - property pos : UInt32 - getter requeued = Deque(SegmentPosition).new - - def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) - validate_preconditions(frame) - offset = frame.arguments["x-stream-offset"]? - @offset, @segment, @pos = stream_queue.find_offset(offset) - super - end - - private def validate_preconditions(frame) - if frame.exclusive - raise Error::PreconditionFailed.new("Stream consumers must not be exclusive") - end - if frame.no_ack - raise Error::PreconditionFailed.new("Stream consumers must acknowledge messages") - end - if @channel.prefetch_count.zero? - raise Error::PreconditionFailed.new("Stream consumers must have a prefetch limit") - end - unless @channel.global_prefetch_count.zero? - raise Error::PreconditionFailed.new("Stream consumers does not support global prefetch limit") - end - if frame.arguments.has_key? "x-priority" - raise Error::PreconditionFailed.new("x-priority not supported on stream queues") - end - case frame.arguments["x-stream-offset"]? - when Nil, Int, Time, "first", "next", "last" - else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") - end - end - - private def deliver_loop - i = 0 - loop do - wait_for_capacity - loop do - raise ClosedError.new if @closed - next if wait_for_queue_ready - next if wait_for_paused_queue - next if wait_for_flow - break - end - {% unless flag?(:release) %} - @log.debug { "Getting a new message" } - {% end %} - stream_queue.consume_get(self) do |env| - deliver(env.message, env.segment_position, env.redelivered) - end - Fiber.yield if (i &+= 1) % 32768 == 0 - end - rescue ex : ClosedError | Queue::ClosedError | Client::Channel::ClosedError | ::Channel::ClosedError - @log.debug { "deliver loop exiting: #{ex.inspect}" } - end - - private def wait_for_queue_ready - if @offset > stream_queue.last_offset && @requeued.empty? - @log.debug { "Waiting for queue not to be empty" } - select - when stream_queue.new_messages.receive - @log.debug { "Queue is not empty" } - when @has_requeued.receive - @log.debug { "Got a requeued message" } - when @notify_closed.receive - end - return true - end - end - - @has_requeued = ::Channel(Nil).new - - private def stream_queue : StreamQueue - @queue.as(StreamQueue) - end - - def reject(sp, requeue : Bool) - super - if requeue - @requeued.push(sp) - @has_requeued.try_send? nil if @requeued.size == 1 - end - end - end - end - end -end diff --git a/src/lavinmq/client/client.cr b/src/lavinmq/client/client.cr index e6eb1db18..052a116df 100644 --- a/src/lavinmq/client/client.cr +++ b/src/lavinmq/client/client.cr @@ -1,846 +1,8 @@ -require "openssl" -require "socket" -require "../logger" -require "../vhost" -require "../message" -require "./channel" -require "../user" -require "../stats" require "../sortable_json" -require "../rough_time" -require "../error" -require "./amqp_connection" -require "../config" -require "../http/handler/websocket" +require "./channel" module LavinMQ - class Client - include Stats + abstract class Client include SortableJSON - - getter vhost, channels, log, name - getter user - getter max_frame_size : UInt32 - getter channel_max : UInt16 - getter heartbeat_timeout : UInt16 - getter auth_mechanism : String - getter client_properties : AMQP::Table - getter remote_address : Socket::IPAddress - - @connected_at = RoughTime.unix_ms - @channels = Hash(UInt16, Client::Channel).new - @exclusive_queues = Array(Queue).new - @heartbeat_interval_ms : Int64? - @local_address : Socket::IPAddress - @running = true - @last_recv_frame = RoughTime.monotonic - @last_sent_frame = RoughTime.monotonic - rate_stats({"send_oct", "recv_oct"}) - DEFAULT_EX = "amq.default" - Log = ::Log.for("client") - - def initialize(@socket : IO, - @connection_info : ConnectionInfo, - @vhost : VHost, - @user : User, - tune_ok, - start_ok) - @remote_address = @connection_info.src - @local_address = @connection_info.dst - - @max_frame_size = tune_ok.frame_max - @channel_max = tune_ok.channel_max - @heartbeat_timeout = tune_ok.heartbeat - @heartbeat_interval_ms = tune_ok.heartbeat.zero? ? nil : ((tune_ok.heartbeat / 2) * 1000).to_i64 - @auth_mechanism = start_ok.mechanism - @name = "#{@remote_address} -> #{@local_address}" - @client_properties = start_ok.client_properties - connection_name = @client_properties["connection_name"]?.try(&.as?(String)) - @metadata = - if connection_name - ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s, name: connection_name}) - else - ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s}) - end - @log = Logger.new(Log, @metadata) - @vhost.add_connection(self) - @log.info { "Connection established for user=#{@user.name}" } - spawn read_loop, name: "Client#read_loop #{@remote_address}" - end - - # Returns client provided connection name if set, else server generated name - def client_name - @client_properties["connection_name"]?.try(&.as(String)) || @name - end - - def channel_name_prefix - @remote_address.to_s - end - - def details_tuple - { - channels: @channels.size, - connected_at: @connected_at, - type: "network", - channel_max: @channel_max, - frame_max: @max_frame_size, - timeout: @heartbeat_timeout, - client_properties: @client_properties, - vhost: @vhost.name, - user: @user.name, - protocol: "AMQP 0-9-1", - auth_mechanism: @auth_mechanism, - host: @local_address.address, - port: @local_address.port, - peer_host: @remote_address.address, - peer_port: @remote_address.port, - name: @name, - pid: @name, - ssl: @connection_info.ssl?, - tls_version: @connection_info.ssl_version, - cipher: @connection_info.ssl_cipher, - state: state, - }.merge(stats_details) - end - - private def read_loop - i = 0 - socket = @socket - loop do - AMQP::Frame.from_io(socket) do |frame| - {% unless flag?(:release) %} - @log.trace { "Received #{frame.inspect}" } - {% end %} - if (i += 1) == 8192 - i = 0 - Fiber.yield - end - frame_size_ok?(frame) || return - case frame - when AMQP::Frame::Connection::Close - @log.info { "Client disconnected: #{frame.reply_text}" } unless frame.reply_text.empty? - send AMQP::Frame::Connection::CloseOk.new - @running = false - next - when AMQP::Frame::Connection::CloseOk - @log.debug { "Confirmed disconnect" } - @running = false - return - end - if @running - process_frame(frame) - else - case frame - when AMQP::Frame::Body - @log.debug { "Skipping body, waiting for CloseOk" } - frame.body.skip(frame.body_size) - else - @log.debug { "Discarding #{frame.class.name}, waiting for CloseOk" } - end - end - rescue e : Error::PreconditionFailed - send_precondition_failed(frame, e.message) - end - rescue IO::TimeoutError - send_heartbeat || break - rescue ex : AMQP::Error::NotImplemented - @log.error { ex.inspect } - send_not_implemented(ex) - rescue ex : AMQP::Error::FrameDecode - @log.error { ex.inspect_with_backtrace } - send_frame_error(ex.message) - rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while reading (#{ex.inspect})" } unless closed? - break - rescue ex : Exception - @log.error { "Unexpected error, while reading: #{ex.inspect_with_backtrace}" } - send_internal_error(ex.message) - end - ensure - cleanup - close_socket - @log.info { "Connection disconnected for user=#{@user.name}" } - end - - private def frame_size_ok?(frame) : Bool - if frame.bytesize > @max_frame_size - send_frame_error("frame size #{frame.bytesize} exceeded max #{@max_frame_size} bytes") - return false - end - true - end - - private def send_heartbeat - now = RoughTime.monotonic - if @last_recv_frame + (@heartbeat_timeout + 5).seconds < now - @log.info { "Heartbeat timeout (#{@heartbeat_timeout}), last seen frame #{(now - @last_recv_frame).total_seconds} s ago, sent frame #{(now - @last_sent_frame).total_seconds} s ago" } - false - else - send AMQP::Frame::Heartbeat.new - end - end - - def send(frame : AMQP::Frame, channel_is_open : Bool? = nil) : Bool - return false if closed? - if channel_is_open.nil? - channel_is_open = frame.channel.zero? || @channels[frame.channel]?.try &.running? - end - unless channel_is_open - @log.debug { "Channel #{frame.channel} is closed so is not sending #{frame.inspect}" } - return false - end - {% unless flag?(:release) %} - @log.trace { "Send #{frame.inspect}" } - {% end %} - @write_lock.synchronize do - s = @socket - s.write_bytes frame, IO::ByteFormat::NetworkEndian - s.flush - end - @last_sent_frame = RoughTime.monotonic - @send_oct_count += 8_u64 + frame.bytesize - if frame.is_a?(AMQP::Frame::Connection::CloseOk) - return false - end - true - rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while sending (#{ex.inspect})" } unless closed? - close_socket - false - rescue ex : IO::TimeoutError - @log.info { "Timeout while sending (#{ex.inspect})" } - close_socket - false - rescue ex - @log.error { "Unexpected error, while sending: #{ex.inspect_with_backtrace}" } - send_internal_error(ex.message) - false - end - - def connection_details - { - peer_host: @remote_address.address, - peer_port: @remote_address.port, - name: @name, - } - end - - @write_lock = Mutex.new(:checked) - - def deliver(frame, msg) - return false if closed? - @write_lock.synchronize do - socket = @socket - websocket = socket.is_a? WebSocketIO - {% unless flag?(:release) %} - @log.trace { "Send #{frame.inspect}" } - {% end %} - socket.write_bytes frame, ::IO::ByteFormat::NetworkEndian - socket.flush if websocket - @send_oct_count += 8_u64 + frame.bytesize - header = AMQP::Frame::Header.new(frame.channel, 60_u16, 0_u16, msg.bodysize, msg.properties) - {% unless flag?(:release) %} - @log.trace { "Send #{header.inspect}" } - {% end %} - socket.write_bytes header, ::IO::ByteFormat::NetworkEndian - socket.flush if websocket - @send_oct_count += 8_u64 + header.bytesize - pos = 0 - while pos < msg.bodysize - length = Math.min(msg.bodysize - pos, @max_frame_size - 8).to_u32 - {% unless flag?(:release) %} - @log.trace { "Send BodyFrame (pos #{pos}, length #{length})" } - {% end %} - body = case msg - in BytesMessage - AMQP::Frame::BytesBody.new(frame.channel, length, msg.body[pos, length]) - in Message - AMQP::Frame::Body.new(frame.channel, length, msg.body_io) - end - socket.write_bytes body, ::IO::ByteFormat::NetworkEndian - socket.flush if websocket - @send_oct_count += 8_u64 + body.bytesize - pos += length - end - socket.flush unless websocket # Websockets need to send one frame per WS frame - @last_sent_frame = RoughTime.monotonic - end - true - rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while sending (#{ex.inspect})" } - close_socket - Fiber.yield - false - rescue ex : AMQ::Protocol::Error::FrameEncode - @log.warn { "Error encoding frame (#{ex.inspect})" } - close_socket - false - rescue ex : IO::TimeoutError - @log.info { "Timeout while sending (#{ex.inspect})" } - close_socket - false - rescue ex - @log.error { "Delivery exception: #{ex.inspect_with_backtrace}" } - raise ex - end - - def state - !@running ? "closed" : (@vhost.flow? ? "running" : "flow") - end - - private def with_channel(frame, &) - if ch = @channels[frame.channel]? - if ch.running? - yield ch - else - case frame - when AMQP::Frame::Basic::Publish, AMQP::Frame::Header - @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } - when AMQP::Frame::Body - @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } - frame.body.skip(frame.body_size) - else - @log.trace { "Discarding #{frame.inspect}, waiting for Close(Ok)" } - end - end - else - case frame - when AMQP::Frame::Basic::Publish, AMQP::Frame::Header - @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } - when AMQP::Frame::Body - @log.trace { "Discarding #{frame.class.name}, waiting for Close(Ok)" } - frame.body.skip(frame.body_size) - else - @log.error { "Channel #{frame.channel} not open while processing #{frame.class.name}" } - close_connection(frame, 504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open") - end - end - end - - private def open_channel(frame) - if @channels.has_key? frame.channel - close_connection(frame, 504_u16, "CHANNEL_ERROR - second 'channel.open' seen") - else - @channels[frame.channel] = Client::Channel.new(self, frame.channel) - @vhost.event_tick(EventType::ChannelCreated) - send AMQP::Frame::Channel::OpenOk.new(frame.channel) - end - end - - # ameba:disable Metrics/CyclomaticComplexity - private def process_frame(frame) : Nil - @last_recv_frame = RoughTime.monotonic - @recv_oct_count += 8_u64 + frame.bytesize - case frame - when AMQP::Frame::Channel::Open - open_channel(frame) - when AMQP::Frame::Channel::Close - @channels.delete(frame.channel).try &.close - send AMQP::Frame::Channel::CloseOk.new(frame.channel), true - when AMQP::Frame::Channel::CloseOk - @channels.delete(frame.channel).try &.close - when AMQP::Frame::Channel::Flow - with_channel frame, &.flow(frame.active) - when AMQP::Frame::Channel::FlowOk - # noop - when AMQP::Frame::Confirm::Select - with_channel frame, &.confirm_select(frame) - when AMQP::Frame::Exchange::Declare - declare_exchange(frame) - when AMQP::Frame::Exchange::Delete - delete_exchange(frame) - when AMQP::Frame::Exchange::Bind - bind_exchange(frame) - when AMQP::Frame::Exchange::Unbind - unbind_exchange(frame) - when AMQP::Frame::Queue::Declare - declare_queue(frame) - when AMQP::Frame::Queue::Bind - bind_queue(frame) - when AMQP::Frame::Queue::Unbind - unbind_queue(frame) - when AMQP::Frame::Queue::Delete - delete_queue(frame) - when AMQP::Frame::Queue::Purge - purge_queue(frame) - when AMQP::Frame::Basic::Publish - start_publish(frame) - when AMQP::Frame::Header - with_channel frame, &.next_msg_headers(frame) - when AMQP::Frame::Body - with_channel frame, &.add_content(frame) - when AMQP::Frame::Basic::Consume - consume(frame) - when AMQP::Frame::Basic::Get - basic_get(frame) - when AMQP::Frame::Basic::Ack - with_channel frame, &.basic_ack(frame) - when AMQP::Frame::Basic::Reject - with_channel frame, &.basic_reject(frame) - when AMQP::Frame::Basic::Nack - with_channel frame, &.basic_nack(frame) - when AMQP::Frame::Basic::Cancel - with_channel frame, &.cancel_consumer(frame) - when AMQP::Frame::Basic::Qos - with_channel frame, &.basic_qos(frame) - when AMQP::Frame::Basic::Recover - with_channel frame, &.basic_recover(frame) - when AMQP::Frame::Tx::Select - with_channel frame, &.tx_select(frame) - when AMQP::Frame::Tx::Commit - with_channel frame, &.tx_commit(frame) - when AMQP::Frame::Tx::Rollback - with_channel frame, &.tx_rollback(frame) - when AMQP::Frame::Heartbeat - nil - else - send_not_implemented(frame) - end - if heartbeat_interval_ms = @heartbeat_interval_ms - if @last_sent_frame + heartbeat_interval_ms.milliseconds < RoughTime.monotonic - send AMQP::Frame::Heartbeat.new - end - end - rescue ex : Error::UnexpectedFrame - @log.error { ex.inspect } - close_channel(ex.frame, 505_u16, "UNEXPECTED_FRAME - #{ex.frame.class.name}") - end - - private def cleanup - @running = false - @channels.each_value &.close - @channels.clear - @exclusive_queues.each(&.close) - @exclusive_queues.clear - @vhost.rm_connection(self) - end - - private def close_socket - @running = false - @socket.close - @log.debug { "Socket closed" } - rescue ex - @log.debug { "#{ex.inspect} when closing socket" } - end - - def close(reason = nil) - reason ||= "Connection closed" - @log.info { "Closing, #{reason}" } - send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16) - @running = false - end - - def force_close - close_socket - end - - def closed? - !@running - end - - def close_channel(frame : AMQ::Protocol::Frame, code, text) - return close_connection(frame, code, text) if frame.channel.zero? - case frame - when AMQ::Protocol::Frame::Method - send AMQP::Frame::Channel::Close.new(frame.channel, code, text, frame.class_id, frame.method_id) - else - send AMQP::Frame::Channel::Close.new(frame.channel, code, text, 0, 0) - end - @channels.delete(frame.channel).try &.close - end - - def close_connection(frame : AMQ::Protocol::Frame?, code, text) - @log.info { "Closing, #{text}" } - case frame - when AMQ::Protocol::Frame::Method - send AMQP::Frame::Connection::Close.new(code, text, frame.class_id, frame.method_id) - else - send AMQP::Frame::Connection::Close.new(code, text, 0_u16, 0_u16) - end - @log.info { "Connection=#{@name} disconnected" } - ensure - @running = false - end - - def send_access_refused(frame, text) - @log.warn { "Access refused channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 403_u16, "ACCESS_REFUSED - #{text}") - end - - def send_not_found(frame, text = "") - @log.warn { "Not found channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 404_u16, "NOT_FOUND - #{text}") - end - - def send_resource_locked(frame, text) - @log.warn { "Resource locked channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 405_u16, "RESOURCE_LOCKED - #{text}") - end - - def send_precondition_failed(frame, text) - @log.warn { "Precondition failed channel=#{frame.channel} reason=\"#{text}\"" } - close_channel(frame, 406_u16, "PRECONDITION_FAILED - #{text}") - end - - def send_not_implemented(frame, text = nil) - @log.error { "#{frame.inspect}, not implemented reason=\"#{text}\"" } - close_channel(frame, 540_u16, "NOT_IMPLEMENTED - #{text}") - end - - def send_not_implemented(ex : AMQ::Protocol::Error::NotImplemented) - text = "NOT_IMPLEMENTED" - if ex.channel.zero? - send AMQP::Frame::Connection::Close.new(540, text, ex.class_id, ex.method_id) - @running = false - else - send AMQP::Frame::Channel::Close.new(ex.channel, 540, text, ex.class_id, ex.method_id) - @channels.delete(ex.channel).try &.close - end - end - - def send_internal_error(message) - close_connection(nil, 541_u16, "INTERNAL_ERROR - Unexpected error, please report") - end - - def send_frame_error(message = nil) - close_connection(nil, 501_u16, "FRAME_ERROR - #{message}") - end - - private def declare_exchange(frame) - if !valid_entity_name(frame.exchange_name) - send_precondition_failed(frame, "Exchange name isn't valid") - elsif frame.exchange_name.empty? - send_access_refused(frame, "Not allowed to declare the default exchange") - elsif e = @vhost.exchanges.fetch(frame.exchange_name, nil) - redeclare_exchange(e, frame) - elsif frame.passive - send_not_found(frame, "Exchange '#{frame.exchange_name}' doesn't exists") - elsif frame.exchange_name.starts_with? "amq." - send_access_refused(frame, "Not allowed to use the amq. prefix") - else - ae = frame.arguments["x-alternate-exchange"]?.try &.as?(String) - ae_ok = ae.nil? || (@user.can_write?(@vhost.name, ae) && @user.can_read?(@vhost.name, frame.exchange_name)) - unless @user.can_config?(@vhost.name, frame.exchange_name) && ae_ok - send_access_refused(frame, "User doesn't have permissions to declare exchange '#{frame.exchange_name}'") - return - end - begin - @vhost.apply(frame) - rescue e : Error::ExchangeTypeError - send_precondition_failed(frame, e.message) - end - send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) unless frame.no_wait - end - end - - private def redeclare_exchange(e, frame) - if frame.passive || e.match?(frame) - unless frame.no_wait - send AMQP::Frame::Exchange::DeclareOk.new(frame.channel) - end - else - send_precondition_failed(frame, "Existing exchange '#{frame.exchange_name}' declared with other arguments") - end - end - - private def delete_exchange(frame) - if !valid_entity_name(frame.exchange_name) - send_precondition_failed(frame, "Exchange name isn't valid") - elsif frame.exchange_name.empty? - send_access_refused(frame, "Not allowed to delete the default exchange") - elsif frame.exchange_name.starts_with? "amq." - send_access_refused(frame, "Not allowed to use the amq. prefix") - elsif !@vhost.exchanges.has_key? frame.exchange_name - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait - elsif !@user.can_config?(@vhost.name, frame.exchange_name) - send_access_refused(frame, "User doesn't have permissions to delete exchange '#{frame.exchange_name}'") - elsif frame.if_unused && @vhost.exchanges[frame.exchange_name].in_use? - send_precondition_failed(frame, "Exchange '#{frame.exchange_name}' in use") - else - @vhost.apply(frame) - send AMQP::Frame::Exchange::DeleteOk.new(frame.channel) unless frame.no_wait - end - end - - # ameba:disable Metrics/CyclomaticComplexity - private def delete_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - end - if !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - return - end - q = @vhost.queues.fetch(frame.queue_name, nil) - if q.nil? - send AMQP::Frame::Queue::DeleteOk.new(frame.channel, 0_u32) unless frame.no_wait - elsif queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Queue '#{q.name}' is exclusive") - elsif frame.if_unused && !q.consumer_count.zero? - send_precondition_failed(frame, "Queue '#{q.name}' in use") - elsif frame.if_empty && !q.message_count.zero? - send_precondition_failed(frame, "Queue '#{q.name}' is not empty") - elsif !@user.can_config?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have permissions to delete queue '#{q.name}'") - else - size = q.message_count - @vhost.apply(frame) - @exclusive_queues.delete(q) if q.exclusive? - send AMQP::Frame::Queue::DeleteOk.new(frame.channel, size) unless frame.no_wait - end - end - - private def valid_entity_name(name) : Bool - return true if name.empty? - name.matches?(/\A[ -~]*\z/) - end - - def queue_exclusive_to_other_client?(q) - q.exclusive? && !@exclusive_queues.includes?(q) - end - - private def declare_queue(frame) - if !frame.queue_name.empty? && !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - elsif q = @vhost.queues.fetch(frame.queue_name, nil) - redeclare_queue(frame, q) - elsif {"amq.rabbitmq.reply-to", "amq.direct.reply-to"}.includes? frame.queue_name - unless frame.no_wait - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) - end - elsif frame.queue_name.starts_with?("amq.direct.reply-to.") - consumer_tag = frame.queue_name[20..] - if @vhost.direct_reply_consumers.has_key? consumer_tag - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 1_u32) - else - send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") - end - elsif frame.passive - send_not_found(frame, "Queue '#{frame.queue_name}' doesn't exists") - elsif frame.queue_name.starts_with? "amq." - send_access_refused(frame, "Not allowed to use the amq. prefix") - elsif @vhost.max_queues.try { |max| @vhost.queues.size >= max } - send_access_refused(frame, "queue limit in vhost '#{@vhost.name}' (#{@vhost.max_queues}) is reached") - else - declare_new_queue(frame) - end - end - - private def redeclare_queue(frame, q) - if queue_exclusive_to_other_client?(q) || invalid_exclusive_redclare?(frame, q) - send_resource_locked(frame, "Exclusive queue") - elsif frame.passive || q.match?(frame) - q.redeclare - unless frame.no_wait - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, q.name, - q.message_count, q.consumer_count) - end - @last_queue_name = frame.queue_name - elsif frame.exclusive && !q.exclusive? - send_resource_locked(frame, "Not an exclusive queue") - else - send_precondition_failed(frame, "Existing queue '#{q.name}' declared with other arguments") - end - end - - private def invalid_exclusive_redclare?(frame, q) - q.exclusive? && !frame.passive && !frame.exclusive - end - - @last_queue_name : String? - - private def declare_new_queue(frame) - unless @vhost.flow? - send_precondition_failed(frame, "Server low on disk space, can not create queue") - end - if frame.queue_name.empty? - frame.queue_name = Queue.generate_name - end - dlx = frame.arguments["x-dead-letter-exchange"]?.try &.as?(String) - dlx_ok = dlx.nil? || (@user.can_write?(@vhost.name, dlx) && @user.can_read?(@vhost.name, name)) - unless @user.can_config?(@vhost.name, frame.queue_name) && dlx_ok - send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue_name}'") - return - end - @vhost.apply(frame) - @last_queue_name = frame.queue_name - if frame.exclusive - @exclusive_queues << @vhost.queues[frame.queue_name] - end - unless frame.no_wait - send AMQP::Frame::Queue::DeclareOk.new(frame.channel, frame.queue_name, 0_u32, 0_u32) - end - end - - private def bind_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - # according to spec if both queue name and routing key is empty, - # then substitute them with the name of the last declared queue - if frame.routing_key.empty? - frame.routing_key = @last_queue_name.not_nil! - end - end - return unless valid_q_bind_unbind?(frame) - - q = @vhost.queues.fetch(frame.queue_name, nil) - if q.nil? - send_not_found frame, "Queue '#{frame.queue_name}' not found" - elsif !@vhost.exchanges.has_key? frame.exchange_name - send_not_found frame, "Exchange '#{frame.exchange_name}' not found" - elsif !@user.can_read?(@vhost.name, frame.exchange_name) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") - elsif !@user.can_write?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - elsif queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Exclusive queue") - else - @vhost.apply(frame) - send AMQP::Frame::Queue::BindOk.new(frame.channel) unless frame.no_wait - end - end - - private def unbind_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - end - return unless valid_q_bind_unbind?(frame) - - q = @vhost.queues.fetch(frame.queue_name, nil) - if q.nil? - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) - elsif !@vhost.exchanges.has_key? frame.exchange_name - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) - elsif !@user.can_read?(@vhost.name, frame.exchange_name) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.exchange_name}'") - elsif !@user.can_write?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - elsif queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Exclusive queue") - else - @vhost.apply(frame) - send AMQP::Frame::Queue::UnbindOk.new(frame.channel) - end - end - - private def valid_q_bind_unbind?(frame) : Bool - if !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - return false - elsif !valid_entity_name(frame.exchange_name) - send_precondition_failed(frame, "Exchange name isn't valid") - return false - elsif frame.exchange_name.empty? || frame.exchange_name == DEFAULT_EX - target = frame.is_a?(AMQP::Frame::Queue::Bind) ? "bind to" : "unbind from" - send_access_refused(frame, "Not allowed to #{target} the default exchange") - return false - end - true - end - - private def bind_exchange(frame) - source = @vhost.exchanges.fetch(frame.source, nil) - destination = @vhost.exchanges.fetch(frame.destination, nil) - if destination.nil? - send_not_found frame, "Exchange '#{frame.destination}' doesn't exists" - elsif source.nil? - send_not_found frame, "Exchange '#{frame.source}' doesn't exists" - elsif !@user.can_read?(@vhost.name, frame.source) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") - elsif !@user.can_write?(@vhost.name, frame.destination) - send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") - elsif frame.source.empty? || frame.destination.empty? - send_access_refused(frame, "Not allowed to bind to the default exchange") - else - @vhost.apply(frame) - send AMQP::Frame::Exchange::BindOk.new(frame.channel) unless frame.no_wait - end - end - - private def unbind_exchange(frame) - source = @vhost.exchanges.fetch(frame.source, nil) - destination = @vhost.exchanges.fetch(frame.destination, nil) - if destination.nil? - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) - elsif source.nil? - # should return not_found according to spec but we make it idempotent - send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) - elsif !@user.can_read?(@vhost.name, frame.source) - send_access_refused(frame, "User doesn't have read permissions to exchange '#{frame.source}'") - elsif !@user.can_write?(@vhost.name, frame.destination) - send_access_refused(frame, "User doesn't have write permissions to exchange '#{frame.destination}'") - elsif frame.source.empty? || frame.destination.empty? || frame.source == DEFAULT_EX || frame.destination == DEFAULT_EX - send_access_refused(frame, "Not allowed to unbind from the default exchange") - else - @vhost.apply(frame) - send AMQP::Frame::Exchange::UnbindOk.new(frame.channel) unless frame.no_wait - end - end - - private def purge_queue(frame) - if frame.queue_name.empty? && @last_queue_name - frame.queue_name = @last_queue_name.not_nil! - end - unless @user.can_read?(@vhost.name, frame.queue_name) - send_access_refused(frame, "User doesn't have write permissions to queue '#{frame.queue_name}'") - return - end - if !valid_entity_name(frame.queue_name) - send_precondition_failed(frame, "Queue name isn't valid") - elsif q = @vhost.queues.fetch(frame.queue_name, nil) - if queue_exclusive_to_other_client?(q) - send_resource_locked(frame, "Queue '#{q.name}' is exclusive") - else - messages_purged = q.purge - send AMQP::Frame::Queue::PurgeOk.new(frame.channel, messages_purged) unless frame.no_wait - end - else - send_not_found(frame, "Queue '#{frame.queue_name}' not found") - end - end - - private def start_publish(frame) - unless @user.can_write?(@vhost.name, frame.exchange) - send_access_refused(frame, "User not allowed to publish to exchange '#{frame.exchange}'") - return - end - with_channel frame, &.start_publish(frame) - end - - private def consume(frame) - if frame.queue.empty? && @last_queue_name - frame.queue = @last_queue_name.not_nil! - end - if !valid_entity_name(frame.queue) - send_precondition_failed(frame, "Queue name isn't valid") - return - end - unless @user.can_read?(@vhost.name, frame.queue) - send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") - return - end - with_channel frame, &.consume(frame) - end - - private def basic_get(frame) - if frame.queue.empty? && @last_queue_name - frame.queue = @last_queue_name.not_nil! - end - if !valid_entity_name(frame.queue) - send_precondition_failed(frame, "Queue name isn't valid") - return - end - unless @user.can_read?(@vhost.name, frame.queue) - send_access_refused(frame, "User doesn't have permissions to queue '#{frame.queue}'") - return - end - # yield so that msg expiration, consumer delivery etc gets priority - Fiber.yield - with_channel frame, &.basic_get(frame) - end end end diff --git a/src/lavinmq/client/connection_factory.cr b/src/lavinmq/client/connection_factory.cr new file mode 100644 index 000000000..05c9199c5 --- /dev/null +++ b/src/lavinmq/client/connection_factory.cr @@ -0,0 +1,6 @@ +require "./client" + +module LavinMQ + abstract class ConnectionFactory + end +end diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 215e57fce..7fb74e20f 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -1,5 +1,5 @@ require "./durable_queue" -require "../client/channel/stream_consumer" +require "../amqp/stream_consumer" require "./stream_queue_message_store" module LavinMQ @@ -67,7 +67,7 @@ module LavinMQ false end - def consume_get(consumer : Client::Channel::StreamConsumer, & : Envelope -> Nil) : Bool + def consume_get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool get(consumer) do |env| yield env env.redelivered ? (@redeliver_count += 1) : (@deliver_count += 1) @@ -77,7 +77,7 @@ module LavinMQ # yield the next message in the ready queue # returns true if a message was deliviered, false otherwise # if we encouncer an unrecoverable ReadError, close queue - private def get(consumer : Client::Channel::StreamConsumer, & : Envelope -> Nil) : Bool + private def get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool raise ClosedError.new if @closed env = @msg_store_lock.synchronize { @msg_store.shift?(consumer) } || return false yield env # deliver the message @@ -157,7 +157,7 @@ module LavinMQ used_segments = Set(UInt32).new @consumers_lock.synchronize do @consumers.each do |consumer| - used_segments << consumer.as(Client::Channel::StreamConsumer).segment + used_segments << consumer.as(AMQP::StreamConsumer).segment end end @msg_store_lock.synchronize do diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index 2754cc643..8ce4f1a12 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -88,7 +88,7 @@ module LavinMQ {msg_offset, segment, pos} end - def shift?(consumer : Client::Channel::StreamConsumer) : Envelope? + def shift?(consumer : AMQP::StreamConsumer) : Envelope? raise ClosedError.new if @closed if env = shift_requeued(consumer.requeued) diff --git a/src/lavinmq/reporter.cr b/src/lavinmq/reporter.cr index 076026303..432ea69fa 100644 --- a/src/lavinmq/reporter.cr +++ b/src/lavinmq/reporter.cr @@ -28,13 +28,16 @@ module LavinMQ puts_size_capacity vh.@connections vh.connections.each do |c| puts " #{c.name}" - puts_size_capacity c.@channels, 4 + puts_size_capacity c.channels, 4 c.channels.each_value do |ch| puts " #{ch.id} global_prefetch=#{ch.global_prefetch_count} prefetch=#{ch.prefetch_count}" - puts_size_capacity ch.@unacked, 6 - puts_size_capacity ch.@consumers, 6 - puts_size_capacity ch.@visited, 6 - puts_size_capacity ch.@found_queues, 6 + puts_size_capacity ch.consumers, 8 + case ch + when AMQP::Channel + puts_size_capacity ch.@unacked, 8 + puts_size_capacity ch.@visited, 8 + puts_size_capacity ch.@found_queues, 8 + end end end end diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 63ee39a8e..ebffc78d3 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -13,6 +13,8 @@ require "./config" require "./connection_info" require "./proxy_protocol" require "./client/client" +require "./client/connection_factory" +require "./amqp/connection_factory" require "./stats" module LavinMQ @@ -34,6 +36,7 @@ module LavinMQ @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) @parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator) + @amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new apply_parameter spawn stats_loop, name: "Server#stats_loop" end @@ -239,7 +242,7 @@ module LavinMQ end def handle_connection(socket, connection_info) - client = AMQPConnection.start(socket, connection_info, @vhosts, @users) + client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users) ensure socket.close if client.nil? end