From 08b0e9ca3f6faf24ba27e3f219b4795f8141296d Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 20 Sep 2024 11:22:07 +0200 Subject: [PATCH 1/4] add a timeout to consumer_available.receive to let federation link reconnect --- src/lavinmq/federation/link.cr | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/lavinmq/federation/link.cr b/src/lavinmq/federation/link.cr index 684de00eb0..a12b8b759e 100644 --- a/src/lavinmq/federation/link.cr +++ b/src/lavinmq/federation/link.cr @@ -232,7 +232,11 @@ module LavinMQ state(State::Running) unless @federated_q.immediate_delivery? @log.debug { "Waiting for consumers" } - @consumer_available.receive? + select + when @consumer_available.receive? + when timeout(1.second) + return if @upstream_connection.try &.closed? + end end q_name = q[:queue_name] upstream_channel.basic_consume(q_name, no_ack: no_ack, tag: @upstream.consumer_tag, block: true) do |msg| From a726357fe06cd693469804ff098784aff84c6c8e Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 20 Sep 2024 11:22:23 +0200 Subject: [PATCH 2/4] add spec --- spec/upstream_spec.cr | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index b86a7fc53c..b83bc660c2 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -224,6 +224,37 @@ describe LavinMQ::Federation::Upstream do end end + it "should reconnect queue link after upstream disconnect" do + with_amqp_server do |s| + upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(s, "ef test upstream restart", nil, "upstream_q") + UpstreamSpecHelpers.start_link(upstream, "downstream_q", "queues") + s.users.add_permission("guest", "upstream", /.*/, /.*/, /.*/) + s.users.add_permission("guest", "downstream", /.*/, /.*/, /.*/) + + upstream_vhost.declare_exchange("upstream_ex", "topic", true, false) + upstream_vhost.declare_queue("upstream_q", true, false) + downstream_vhost.declare_exchange("downstream_ex", "topic", true, false) + downstream_vhost.declare_queue("downstream_q", true, false) + + wait_for { downstream_vhost.queues["downstream_q"].policy.try(&.name) == "FE" } + wait_for { upstream.links.first?.try &.state.running? } + + sleep 1.seconds + + # Disconnect the federation link + upstream_vhost.connections.each do |conn| + next unless conn.client_name.starts_with?("Federation link") + conn.close + end + + sleep 1.seconds + + # wait for federation link to be reconnected + wait_for { upstream.links.first?.try &.state.running? } + wait_for { upstream.links.first?.try { |l| l.@upstream_connection.try &.closed? == false } } + end + end + it "should continue after upstream restart" do with_amqp_server do |s| upstream, upstream_vhost, downstream_vhost = UpstreamSpecHelpers.setup_federation(s, "ef test upstream restart", "upstream_ex") From 317e0b96fa216c8324fa9d0eb08a1ac0e5071ac6 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 20 Sep 2024 11:27:33 +0200 Subject: [PATCH 3/4] shorter sleeps in spec --- spec/upstream_spec.cr | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index b83bc660c2..876c3d3362 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -238,16 +238,14 @@ describe LavinMQ::Federation::Upstream do wait_for { downstream_vhost.queues["downstream_q"].policy.try(&.name) == "FE" } wait_for { upstream.links.first?.try &.state.running? } - - sleep 1.seconds + sleep 0.1.seconds # Disconnect the federation link upstream_vhost.connections.each do |conn| next unless conn.client_name.starts_with?("Federation link") conn.close end - - sleep 1.seconds + sleep 0.1.seconds # wait for federation link to be reconnected wait_for { upstream.links.first?.try &.state.running? } From 615964eabcdb01d1af89acd1a841ab0e3ad3aee5 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 20 Sep 2024 16:45:09 +0200 Subject: [PATCH 4/4] loop around select --- src/lavinmq/federation/link.cr | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/lavinmq/federation/link.cr b/src/lavinmq/federation/link.cr index a12b8b759e..942e55a1aa 100644 --- a/src/lavinmq/federation/link.cr +++ b/src/lavinmq/federation/link.cr @@ -232,10 +232,13 @@ module LavinMQ state(State::Running) unless @federated_q.immediate_delivery? @log.debug { "Waiting for consumers" } - select - when @consumer_available.receive? - when timeout(1.second) - return if @upstream_connection.try &.closed? + loop do + select + when @consumer_available.receive? + break + when timeout(1.second) + return if @upstream_connection.try &.closed? + end end end q_name = q[:queue_name]