From a24a97c158644a020ff402f30ae513c4cfd054aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Fri, 16 Aug 2024 10:17:30 +0200 Subject: [PATCH 1/2] Stop source on errors, runner will restart it --- spec/shovel_spec.cr | 32 ++++++++++++++++++++++++++++++++ src/lavinmq/shovel/shovel.cr | 7 ++++--- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/spec/shovel_spec.cr b/spec/shovel_spec.cr index 40397a887..fc6e061ea 100644 --- a/spec/shovel_spec.cr +++ b/spec/shovel_spec.cr @@ -14,6 +14,38 @@ end describe LavinMQ::Shovel do describe "AMQP" do + describe "Source" do + it "will stop and raise on unexpected disconnect" do + with_amqp_server do |s| + source = LavinMQ::Shovel::AMQPSource.new( + "spec", + [URI.parse(s.amqp_url)], + "source", + direct_user: s.users.direct_user + ) + + wg = WaitGroup.new(1) + exception : Exception? = nil + spawn do + source.start + wg.done + source.each do + end + rescue ex + exception = ex + ensure + wg.done + end + wg.wait # wait for source to start + wg.add 1 + s.vhosts["/"].connections.each &.close("spec") + wg.wait # wait for exception to be rescued + source.started?.should be_false + exception.should be_a AMQP::Client::Connection::ClosedException + end + end + end + it "will wait to ack all msgs before deleting it self" do with_amqp_server do |s| source = LavinMQ::Shovel::AMQPSource.new( diff --git a/src/lavinmq/shovel/shovel.cr b/src/lavinmq/shovel/shovel.cr index 97fbc73f7..94755311c 100644 --- a/src/lavinmq/shovel/shovel.cr +++ b/src/lavinmq/shovel/shovel.cr @@ -155,10 +155,11 @@ module LavinMQ end rescue e : FailedDeliveryError msg.reject - rescue e - stop - raise e end + rescue e + Log.warn { "name=#{@name} #{e.message}" } + stop + raise e end end From 1778fbeacab0c0e87371a02f0643d3554aa3a05b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20B=C3=B6rjesson?= Date: Tue, 20 Aug 2024 08:56:02 +0200 Subject: [PATCH 2/2] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7961e9eaa..889396076 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Prevet a queue that's overflowing to consume too much resources - Dead-lettering loop when publishing to a delayed exchange's internal queue [#748](https://github.com/cloudamqp/lavinmq/pull/748) - Exchange federation tried to bind to upstream's default exchange +- Shovel AMQP source didn't reconnect on network failures ### Changed