Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve broker initiated client disconnects #816

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ module LavinMQ
end
true
rescue ex : IO::Error | OpenSSL::SSL::Error
@log.debug { "Lost connection, while sending (#{ex.inspect})" }
@log.debug { "Lost connection, while sending message (#{ex.inspect})" } unless closed?
close_socket
Fiber.yield
false
Expand Down Expand Up @@ -413,14 +413,20 @@ module LavinMQ
private def close_socket
@running = false
@socket.close
@log.debug { "Socket closed" }
rescue ex
@log.debug { "#{ex.inspect} when closing socket" }
end

def close(reason = nil)
def close(reason = nil, timeout : Time::Span = 5.seconds)
reason ||= "Connection closed"
@log.info { "Closing, #{reason}" }
@log.info { "Closing: #{reason}" }

socket = @socket
if socket.responds_to?(:"write_timeout=")
socket.write_timeout = timeout
socket.read_timeout = timeout
end

send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16)
@running = false
end
Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/http/controller/connections.cr
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ module LavinMQ
delete "/api/connections/:name" do |context, params|
with_connection(context, params) do |c|
reason = context.request.headers["X-Reason"]? || "Closed via management plugin"
c.close(reason)
c.close(reason, timeout: 3.seconds)
spuun marked this conversation as resolved.
Show resolved Hide resolved
context.response.status_code = 204
end
end
Expand All @@ -58,7 +58,7 @@ module LavinMQ
connections = get_connections_by_username(context, params["username"])
reason = context.request.headers["X-Reason"]? || "Closed via management plugin"
connections.each do |c|
c.close(reason)
c.close(reason, timeout: 3.seconds)
end
context.response.status_code = 204
context
Expand Down
64 changes: 57 additions & 7 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -369,18 +369,68 @@ module LavinMQ
upstreams.stop_all
end

private def close_connections(reason, close_done : Channel(Nil))
ch = Channel(Client).new
carlhoerberg marked this conversation as resolved.
Show resolved Hide resolved
wg = WaitGroup.new

fiber_count = 0
spawn_close_fiber = -> do
fiber_id = (fiber_count &+= 1)
@log.trace { "Spawn close fiber #{fiber_id}" }
wg.add
spawn(name: "vhost close connection #{fiber_id}") do
loop do
select
when client = ch.receive
client.close(reason)
when timeout 50.milliseconds
break
end
rescue
spuun marked this conversation as resolved.
Show resolved Hide resolved
end
spuun marked this conversation as resolved.
Show resolved Hide resolved
wg.done
@log.trace { "Exit close fiber #{fiber_id}" }
end
end

{@connections.size, 100}.min.times do
spawn_close_fiber.call
end

@connections.each do |client|
spuun marked this conversation as resolved.
Show resolved Hide resolved
loop do
select
when ch.send client
break
when timeout 200.milliseconds
# Spawn more fibers if all current are too busy closing "zombies"
spawn_close_fiber.call
spuun marked this conversation as resolved.
Show resolved Hide resolved
end
end
end

wg.wait
ch.close
spuun marked this conversation as resolved.
Show resolved Hide resolved
close_done.try &.close
end

def close(reason = "Broker shutdown")
@closed = true
stop_shovels
stop_upstream_links
Fiber.yield
@log.debug { "Closing connections" }
@connections.each &.close(reason)
# wait up to 10s for clients to gracefully close
100.times do
break if @connections.empty?
sleep 0.1.seconds

@log.info { "Closing connections" }
close_done = Channel(Nil).new
close_connections reason, close_done
spuun marked this conversation as resolved.
Show resolved Hide resolved
@log.debug { "Close sent to all connections" }

select
when close_done.receive?
@log.info { "All connections closed gracefully" }
when timeout 10.seconds
spuun marked this conversation as resolved.
Show resolved Hide resolved
@log.warn { "Timeout waiting for connections to close. #{@connections.size} left." }
end

# then force close the remaining (close tcp socket)
@connections.each &.force_close
Fiber.yield # yield so that Client read_loops can shutdown
Expand Down
Loading