Skip to content

Commit

Permalink
Add timeout to Client#close to force close socket
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Oct 25, 2024
1 parent ea5c3f7 commit f8a0fca
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
52 changes: 43 additions & 9 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ module LavinMQ
end
true
rescue ex : IO::Error | OpenSSL::SSL::Error
@log.debug { "Lost connection, while sending (#{ex.inspect})" }
@log.debug { "Lost connection, while sending message (#{ex.inspect})" } unless closed?
close_socket
Fiber.yield
false
Expand Down Expand Up @@ -410,23 +410,57 @@ module LavinMQ
@vhost.rm_connection(self)
end

private def close_socket
private def close_socket(force : Bool = false)
@running = false
@socket.close
@log.debug { "Socket closed" }
if force && (socket = @socket.as?(Socket))
# Socket#close_write will close on "kernel level" which will ensure
# we abort any blocking writes. Socket#close tries to flush the socket
# etc, meaning that we may block for a long time (or forever for broken
# clients)
socket.close_write
socket.close
else
@socket.close
end
rescue ex
@log.debug { "#{ex.inspect} when closing socket" }
end

def close(reason = nil)
def close(reason = nil, timeout force_close_timeout : Time::Span = 10.seconds)
reason ||= "Connection closed"
@log.info { "Closing, #{reason}" }
send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16)
@running = false
@log.info { "Closing: #{reason}" }

# We spawn fibers to make #close non-blocking. Required for e.g. http API calls.
# It's probably good for VHost#close too which will close all it's connections.
# If we run into problems with too many fibers we can probably make one timeout fiber
# handling all fibers we're about to close.

ch = ::Channel(Nil).new
# send may be block by @write_lock, therefore we spawn to send
spawn(name: "Client send Close") do
send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16)
@running = false
ch.send nil rescue ::Channel::ClosedError
rescue IO::Error
end

# The @write_lock may be locked "forever" (consumer not consuming and buffers filled up),
# so we may never be able to send Close. We therefore spawn this timeout fiber
spawn(name: "Client send Close tiemout") do
select
when ch.receive
@log.info { "Close frame sent, now force closing socket" }
when timeout force_close_timeout
@log.info { "Timeout sending close, force closing socket" }
end
ch.close
# We always force close... Maybe wrong?
force_close
end
end

def force_close
close_socket
close_socket(force: true)
end

def closed?
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/http/controller/connections.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module LavinMQ
delete "/api/connections/:name" do |context, params|
with_connection(context, params) do |c|
reason = context.request.headers["X-Reason"]? || "Closed via management plugin"
c.close(reason)
c.close(reason, timeout: 3.seconds)
context.response.status_code = 204
end
end
Expand All @@ -58,7 +58,7 @@ module LavinMQ
connections = get_connections_by_username(context, params["username"])
reason = context.request.headers["X-Reason"]? || "Closed via management plugin"
connections.each do |c|
c.close(reason)
c.close(reason, timeout: 3.seconds)
end
context.response.status_code = 204
context
Expand Down

0 comments on commit f8a0fca

Please sign in to comment.