From cbbd05559c1529105b239c056f6119208888e25d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sat, 26 Oct 2024 21:54:19 +0200 Subject: [PATCH 1/2] Be more reslient when identifying PROXY protocol connections PROXY protocol connections are sniffed in clustering mode, so that followers can forward traffic to the leader. A IndexError was raised when a client opened a connections but didn't send a full 5 or 8 bytes (eg. opening and closing the connection for healtcheck). This patch fixes that bug, and also only check for PROXY protocol connections if clustering is enabled at all. --- src/lavinmq/server.cr | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index ebffc78d32..a82a8a43d5 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -103,11 +103,14 @@ module LavinMQ when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else - if client.peek[0, 5] == "PROXY".to_slice && + # Allow proxy connection from followers + if Config.instance.clustering? && + client.peek[0, 5]? == "PROXY".to_slice && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower ProxyProtocol::V1.parse(client) - elsif client.peek[0, 8] == ProxyProtocol::V2::Signature.to_slice[0, 8] && + elsif Config.instance.clustering? && + client.peek[0, 8]? == ProxyProtocol::V2::Signature.to_slice[0, 8] && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower ProxyProtocol::V2.parse(client) From 336238a153df1d4b2f09d96179c96b0b4353caa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sat, 26 Oct 2024 22:18:13 +0200 Subject: [PATCH 2/2] spec --- spec/server_spec.cr | 11 +++++++++++ spec/spec_helper.cr | 7 +++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/spec/server_spec.cr b/spec/server_spec.cr index e3fd268292..94f0111c2f 100644 --- a/spec/server_spec.cr +++ b/spec/server_spec.cr @@ -1288,4 +1288,15 @@ describe LavinMQ::Server do end end end + + it "can handle non valid protocol starts" do + LavinMQ::Config.instance.clustering = true + with_amqp_server do |s| + t = TCPSocket.new("localhost", amqp_port(s)) + t.print "HTTP" + buf = Bytes.new 8 + t.read(buf) + buf.should eq Bytes['A'.ord, 'M'.ord, 'Q'.ord, 'P'.ord, 0, 0, 9, 1] + end + end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 31cdd8e76e..f4bf0652e1 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -29,8 +29,7 @@ end def with_channel(s : LavinMQ::Server, file = __FILE__, line = __LINE__, **args, &) name = "lavinmq-spec-#{file}:#{line}" - port = s.@listeners.keys.select(TCPServer).first.local_address.port - args = {port: port, name: name}.merge(args) + args = {port: amqp_port(s), name: name}.merge(args) conn = AMQP::Client.new(**args).connect ch = conn.channel yield ch @@ -38,6 +37,10 @@ ensure conn.try &.close(no_wait: false) end +def amqp_port(s) + s.@listeners.keys.select(TCPServer).first.local_address.port +end + def should_eventually(expectation, timeout = 5.seconds, file = __FILE__, line = __LINE__, &) sec = Time.monotonic loop do