Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve broker initiated client disconnects #816

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ module LavinMQ
end
true
rescue ex : IO::Error | OpenSSL::SSL::Error
@log.debug { "Lost connection, while sending (#{ex.inspect})" }
@log.debug { "Lost connection, while sending message (#{ex.inspect})" } unless closed?
close_socket
Fiber.yield
false
Expand Down Expand Up @@ -413,14 +413,20 @@ module LavinMQ
private def close_socket
@running = false
@socket.close
@log.debug { "Socket closed" }
rescue ex
@log.debug { "#{ex.inspect} when closing socket" }
end

def close(reason = nil)
def close(reason = nil, timeout : Time::Span = 5.seconds)
reason ||= "Connection closed"
@log.info { "Closing, #{reason}" }
@log.info { "Closing: #{reason}" }

socket = @socket
if socket.responds_to?(:"write_timeout=")
socket.write_timeout = timeout
socket.read_timeout = timeout
end

send AMQP::Frame::Connection::Close.new(320_u16, "CONNECTION_FORCED - #{reason}", 0_u16, 0_u16)
@running = false
end
Expand Down
50 changes: 43 additions & 7 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -369,18 +369,54 @@ module LavinMQ
upstreams.stop_all
end

private def close_connections(reason)
WaitGroup.wait do |wg|
to_close = Channel(Client).new
fiber_count = 0
@connections.each do |client|
loop do
select
when to_close.send client
break
else
fiber_id = fiber_count &+= 1
@log.trace { "spawning close conn fiber #{fiber_id} " }
wg.spawn do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make a PR to the stdlib so that we can name fibers spawned by WaitGroup#spawn.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR sent.

while client_to_close = to_close.receive?
client_to_close.close(reason)
end
@log.trace { "exiting close conn fiber #{fiber_id} " }
end
Fiber.yield
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Required? Doesn't Channel#send on a unbuffered channel automatically yield the current fiber?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why, but it did spawn one extra fiber. Not a big deal, but...

end
end
end
to_close.close
end
end

def close(reason = "Broker shutdown")
@closed = true
stop_shovels
stop_upstream_links
Fiber.yield
@log.debug { "Closing connections" }
@connections.each &.close(reason)
# wait up to 10s for clients to gracefully close
100.times do
break if @connections.empty?
sleep 0.1.seconds

@log.info { "Closing connections" }
close_done = Channel(Nil).new

spawn do
close_connections reason
@log.debug { "Close sent to all connections" }
close_done.close
end

select
when close_done.receive?
@log.info { "All connections closed gracefully" }
when timeout 5.seconds
@log.warn { "Timeout waiting for connections to close. #{@connections.size} left." }
end
spuun marked this conversation as resolved.
Show resolved Hide resolved
close_done.close

# then force close the remaining (close tcp socket)
@connections.each &.force_close
Fiber.yield # yield so that Client read_loops can shutdown
Expand Down
Loading