Skip to content

Commit

Permalink
Stop source on errors, runner will restart it
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Aug 20, 2024
1 parent 81ac475 commit a24a97c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
32 changes: 32 additions & 0 deletions spec/shovel_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,38 @@ end

describe LavinMQ::Shovel do
describe "AMQP" do
describe "Source" do
it "will stop and raise on unexpected disconnect" do
with_amqp_server do |s|
source = LavinMQ::Shovel::AMQPSource.new(
"spec",
[URI.parse(s.amqp_url)],
"source",
direct_user: s.users.direct_user
)

wg = WaitGroup.new(1)
exception : Exception? = nil
spawn do
source.start
wg.done
source.each do
end
rescue ex
exception = ex
ensure
wg.done
end
wg.wait # wait for source to start
wg.add 1
s.vhosts["/"].connections.each &.close("spec")
wg.wait # wait for exception to be rescued
source.started?.should be_false
exception.should be_a AMQP::Client::Connection::ClosedException
end
end
end

it "will wait to ack all msgs before deleting it self" do
with_amqp_server do |s|
source = LavinMQ::Shovel::AMQPSource.new(
Expand Down
7 changes: 4 additions & 3 deletions src/lavinmq/shovel/shovel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,11 @@ module LavinMQ
end
rescue e : FailedDeliveryError
msg.reject
rescue e
stop
raise e
end
rescue e
Log.warn { "name=#{@name} #{e.message}" }
stop
raise e
end
end

Expand Down

0 comments on commit a24a97c

Please sign in to comment.