From f8a0fca63996b8cdabe4ea2326a866a245aed6e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 25 Oct 2024 13:34:15 +0200 Subject: [PATCH] Add timeout to Client#close to force close socket --- src/lavinmq/amqp/client.cr | 52 ++++++++++++++++++---- src/lavinmq/http/controller/connections.cr | 4 +- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/lavinmq/amqp/client.cr b/src/lavinmq/amqp/client.cr index 093361a13e..491befc042 100644 --- a/src/lavinmq/amqp/client.cr +++ b/src/lavinmq/amqp/client.cr @@ -262,7 +262,7 @@ module LavinMQ end true rescue ex : IO::Error | OpenSSL::SSL::Error - @log.debug { "Lost connection, while sending (#{ex.inspect})" } + @log.debug { "Lost connection, while sending message (#{ex.inspect})" } unless closed? close_socket Fiber.yield false @@ -410,23 +410,57 @@ module LavinMQ @vhost.rm_connection(self) end - private def close_socket + private def close_socket(force : Bool = false) @running = false - @socket.close - @log.debug { "Socket closed" } + if force && (socket = @socket.as?(Socket)) + # Socket#close_write will close on "kernel level" which will ensure + # we abort any blocking writes. Socket#close tries to flush the socket + # etc, meaning that we may block for a long time (or forever for broken + # clients) + socket.close_write + socket.close + else + @socket.close + end rescue ex @log.debug { "#{ex.inspect} when closing socket" } end - def close(reason = nil) + def close(reason = nil, timeout force_close_timeout : Time::Span = 10.seconds) reason ||= "Connection closed" - @log.info { "Closing, #{reason}" } - send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16) - @running = false + @log.info { "Closing: #{reason}" } + + # We spawn fibers to make #close non-blocking. Required for e.g. http API calls. + # It's probably good for VHost#close too which will close all it's connections. + # If we run into problems with too many fibers we can probably make one timeout fiber + # handling all fibers we're about to close. + + ch = ::Channel(Nil).new + # send may be block by @write_lock, therefore we spawn to send + spawn(name: "Client send Close") do + send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16) + @running = false + ch.send nil rescue ::Channel::ClosedError + rescue IO::Error + end + + # The @write_lock may be locked "forever" (consumer not consuming and buffers filled up), + # so we may never be able to send Close. We therefore spawn this timeout fiber + spawn(name: "Client send Close tiemout") do + select + when ch.receive + @log.info { "Close frame sent, now force closing socket" } + when timeout force_close_timeout + @log.info { "Timeout sending close, force closing socket" } + end + ch.close + # We always force close... Maybe wrong? + force_close + end end def force_close - close_socket + close_socket(force: true) end def closed? diff --git a/src/lavinmq/http/controller/connections.cr b/src/lavinmq/http/controller/connections.cr index de78a4a386..190e474179 100644 --- a/src/lavinmq/http/controller/connections.cr +++ b/src/lavinmq/http/controller/connections.cr @@ -38,7 +38,7 @@ module LavinMQ delete "/api/connections/:name" do |context, params| with_connection(context, params) do |c| reason = context.request.headers["X-Reason"]? || "Closed via management plugin" - c.close(reason) + c.close(reason, timeout: 3.seconds) context.response.status_code = 204 end end @@ -58,7 +58,7 @@ module LavinMQ connections = get_connections_by_username(context, params["username"]) reason = context.request.headers["X-Reason"]? || "Closed via management plugin" connections.each do |c| - c.close(reason) + c.close(reason, timeout: 3.seconds) end context.response.status_code = 204 context