Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect clients that gets disconnected in lavinmqperf throughput #690

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ shards:

amqp-client:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.2.3
version: 1.2.4

lz4:
git: https://github.com/84codes/lz4.cr.git
Expand Down
25 changes: 15 additions & 10 deletions src/lavinmqperf.cr
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ class Throughput < Perf
done = Channel(Nil).new
@consumers.times do
if @poll
spawn poll_consume(done)
spawn { reconnect_on_disconnect(done) { poll_consume } }
else
spawn consume(done)
spawn { reconnect_on_disconnect(done) { consume } }
end
end

@publishers.times do
spawn pub(done)
spawn { reconnect_on_disconnect(done) { pub } }
end

if @timeout != Time::Span.zero
Expand Down Expand Up @@ -221,7 +221,7 @@ class Throughput < Perf
end
end

private def pub(done) # ameba:disable Metrics/CyclomaticComplexity
private def pub # ameba:disable Metrics/CyclomaticComplexity
data = Bytes.new(@size) { |i| ((i % 27 + 64)).to_u8 }
props = @properties
props.delivery_mode = 2u8 if @persistent
Expand Down Expand Up @@ -259,11 +259,9 @@ class Throughput < Perf
end
end
end
ensure
done.send nil
end

private def consume(done) # ameba:disable Metrics/CyclomaticComplexity
private def consume # ameba:disable Metrics/CyclomaticComplexity
data = Bytes.new(@size) { |i| ((i % 27 + 64)).to_u8 }
AMQP::Client.start(@uri) do |a|
ch = a.channel
Expand Down Expand Up @@ -302,11 +300,9 @@ class Throughput < Perf
end
end
end
ensure
done.send nil
end

private def poll_consume(done)
private def poll_consume
AMQP::Client.start(@uri) do |a|
ch = a.channel
q = begin
Expand Down Expand Up @@ -339,6 +335,15 @@ class Throughput < Perf
end
end
end
end

private def reconnect_on_disconnect(done, &)
loop do
break yield
rescue ex : AMQP::Client::Error | IO::Error
puts ex.message
sleep 1
end
ensure
done.send nil
end
Expand Down
Loading