Skip to content

Commit

Permalink
Reconnect clients that gets disconnected in lavinmqperf throughput (#690
Browse files Browse the repository at this point in the history
)

Automatically reconnect clients if they get disconnected while running `lavinmqperf throughput`
  • Loading branch information
carlhoerberg authored and viktorerlingsson committed Sep 19, 2024
1 parent 7423a06 commit c4e1545
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ shards:

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.2.3
version: 1.2.4

lz4:
git: https://github.com/84codes/lz4.cr.git
Expand Down
25 changes: 15 additions & 10 deletions src/lavinmqperf.cr
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ class Throughput < Perf
done = Channel(Nil).new
@consumers.times do
if @poll
spawn poll_consume(done)
spawn { reconnect_on_disconnect(done) { poll_consume } }
else
spawn consume(done)
spawn { reconnect_on_disconnect(done) { consume } }
end
end

@publishers.times do
spawn pub(done)
spawn { reconnect_on_disconnect(done) { pub } }
end

if @timeout != Time::Span.zero
Expand Down Expand Up @@ -221,7 +221,7 @@ class Throughput < Perf
end
end

private def pub(done) # ameba:disable Metrics/CyclomaticComplexity
private def pub # ameba:disable Metrics/CyclomaticComplexity
data = Bytes.new(@size) { |i| ((i % 27 + 64)).to_u8 }
props = @properties
props.delivery_mode = 2u8 if @persistent
Expand Down Expand Up @@ -259,11 +259,9 @@ class Throughput < Perf
end
end
end
ensure
done.send nil
end

private def consume(done) # ameba:disable Metrics/CyclomaticComplexity
private def consume # ameba:disable Metrics/CyclomaticComplexity
data = Bytes.new(@size) { |i| ((i % 27 + 64)).to_u8 }
AMQP::Client.start(@uri) do |a|
ch = a.channel
Expand Down Expand Up @@ -302,11 +300,9 @@ class Throughput < Perf
end
end
end
ensure
done.send nil
end

private def poll_consume(done)
private def poll_consume
AMQP::Client.start(@uri) do |a|
ch = a.channel
q = begin
Expand Down Expand Up @@ -339,6 +335,15 @@ class Throughput < Perf
end
end
end
end

private def reconnect_on_disconnect(done, &)
loop do
break yield
rescue ex : AMQP::Client::Error | IO::Error
puts ex.message
sleep 1
end
ensure
done.send nil
end
Expand Down

0 comments on commit c4e1545

Please sign in to comment.