diff --git a/spec/api/connections_spec.cr b/spec/api/connections_spec.cr index c4cfbfbf7a..506e3ab99c 100644 --- a/spec/api/connections_spec.cr +++ b/spec/api/connections_spec.cr @@ -139,7 +139,7 @@ describe LavinMQ::HTTP::ConnectionsController do with_channel(s, user: "arnold", password: "pw") do response = http.delete("/api/connections/username/arnold", headers: hdrs) response.status_code.should eq 204 - sleep 0.1 + sleep 0.1.seconds response = http.get("/api/connections/username/arnold", headers: hdrs) body = JSON.parse(response.body) body.as_a.empty?.should be_true diff --git a/spec/api/consumers_spec.cr b/spec/api/consumers_spec.cr index 059849d92d..1ca9c65269 100644 --- a/spec/api/consumers_spec.cr +++ b/spec/api/consumers_spec.cr @@ -34,7 +34,7 @@ describe LavinMQ::HTTP::ConsumersController do with_channel(s) do |ch| q = ch.queue("") q.subscribe { } - sleep 0.01 + sleep 10.milliseconds response = http.get("/api/consumers/%2f") response.status_code.should eq 200 body = JSON.parse(response.body) @@ -59,7 +59,7 @@ describe LavinMQ::HTTP::ConsumersController do with_channel(s) do |ch| q = ch.queue("") consumer = q.subscribe { } - sleep 0.01 + sleep 10.milliseconds conn = s.connections.to_a.last.name response = http.delete("/api/consumers/%2f/#{URI.encode_path(conn)}/#{ch.id}/#{consumer}") response.status_code.should eq 204 @@ -74,7 +74,7 @@ describe LavinMQ::HTTP::ConsumersController do with_channel(s) do |ch| q = ch.queue("") consumer = q.subscribe { } - sleep 0.01 + sleep 10.milliseconds response = http.delete("/api/consumers/%2f/#{URI.encode_path("abc")}/#{ch.id}/#{consumer}") response.status_code.should eq 404 end @@ -87,7 +87,7 @@ describe LavinMQ::HTTP::ConsumersController do conn = s.connections.first.name q = ch.queue("") consumer = q.subscribe { } - sleep 0.01 + sleep 10.milliseconds response = http.delete("/api/consumers/%2f/#{URI.encode_path(conn)}/123/#{consumer}") response.status_code.should eq 404 end @@ -100,7 +100,7 @@ describe LavinMQ::HTTP::ConsumersController do conn = s.connections.first.name q = ch.queue("") q.subscribe { } - sleep 0.01 + sleep 10.milliseconds response = http.delete("/api/consumers/%2f/#{URI.encode_path(conn)}/#{ch.id}/test") response.status_code.should eq 404 end diff --git a/spec/api/definitions_spec.cr b/spec/api/definitions_spec.cr index 29fab99079..079e8ed5bd 100644 --- a/spec/api/definitions_spec.cr +++ b/spec/api/definitions_spec.cr @@ -173,7 +173,7 @@ describe LavinMQ::HTTP::Server do response.status_code.should eq 200 # Because we run shovels in a new Fiber we have to make sure the shovel is not started # after this spec has finished - sleep 0.1 # Start the shovel + sleep 0.1.seconds # Start the shovel wait_for do shovels = s.vhosts["/"].shovels.not_nil! shovels.each_value.all? &.running? diff --git a/spec/api/queues_spec.cr b/spec/api/queues_spec.cr index 460a42e319..900eb42575 100644 --- a/spec/api/queues_spec.cr +++ b/spec/api/queues_spec.cr @@ -318,7 +318,7 @@ describe LavinMQ::HTTP::QueuesController do with_channel(s) do |ch| q = ch.queue("q3", auto_delete: false, durable: true, exclusive: false) q.publish "m1" - sleep 0.05 + sleep 0.05.milliseconds body = %({ "count": 1, "ack_mode": "reject_requeue_true", @@ -380,7 +380,7 @@ describe LavinMQ::HTTP::QueuesController do with_channel(s) do |ch| q = ch.queue("q5", auto_delete: false, durable: true, exclusive: false) q.publish "m1" - sleep 0.05 + sleep 0.05.milliseconds body = %({ "count": 2, "ack_mode": "get", @@ -414,7 +414,7 @@ describe LavinMQ::HTTP::QueuesController do with_channel(s) do |ch| q = ch.queue("q7") q.publish "m1" - sleep 0.05 + sleep 0.05.milliseconds body = %({ "count": 1, "ack_mode": "get", @@ -433,7 +433,7 @@ describe LavinMQ::HTTP::QueuesController do with_channel(s) do |ch| q = ch.queue("q8") q.publish "m1" - sleep 0.05 + sleep 0.05.milliseconds body = %({ "count": 1, "ack_mode": "get", diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index d6f91b8881..3b48493314 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -18,7 +18,7 @@ describe LavinMQ::Clustering::Client do client = HTTP::Client.new("127.0.0.1", 2379) i = 0 loop do - sleep 0.02 + sleep 0.02.seconds response = client.get("/version") if response.status.ok? next if response.body.includes? "not_decided" @@ -99,25 +99,25 @@ describe LavinMQ::Clustering::Client do rescue LavinMQ::Etcd::Error # expect this when etcd nodes are terminated end - sleep 0.5 + sleep 0.5.seconds spawn(name: "failover1") do controller1.run end spawn(name: "failover2") do controller2.run end - sleep 0.1 + sleep 0.1.seconds leader = listen.receive case leader when /1$/ controller1.stop listen.receive.should match /2$/ - sleep 0.1 + sleep 0.1.seconds controller2.stop when /2$/ controller2.stop listen.receive.should match /1$/ - sleep 0.1 + sleep 0.1.seconds controller1.stop else fail("no leader elected") end diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index eaecae93a6..3d611a6bee 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -29,7 +29,7 @@ describe LavinMQ::Etcd do # expect this when etcd nodes are terminated end w.receive # sync - sleep 0.05 + sleep 50.milliseconds etcd.put "foo", "bar" w.receive.should eq "bar" etcd.put "foo", "rab" @@ -162,7 +162,7 @@ class EtcdCluster i = 0 client = HTTP::Client.new("127.0.0.1", 23000 + port) loop do - sleep 0.02 + sleep 0.02.seconds response = client.get("/version") if response.status.ok? next if response.body.includes? "not_decided" diff --git a/spec/flow_spec.cr b/spec/flow_spec.cr index ecd9fdc1c5..856208121f 100644 --- a/spec/flow_spec.cr +++ b/spec/flow_spec.cr @@ -16,7 +16,7 @@ describe "Flow" do ch.flow(false) msgs.pop.ack q.publish "msg" - sleep 0.05 # wait little so a new message could be delivered + sleep 50.milliseconds # wait little so a new message could be delivered msgs.size.should eq 0 ch.flow(true) wait_for { msgs.size == 1 } diff --git a/spec/policies_spec.cr b/spec/policies_spec.cr index c9117a10c9..2f883b5ef7 100644 --- a/spec/policies_spec.cr +++ b/spec/policies_spec.cr @@ -28,10 +28,10 @@ describe LavinMQ::VHost do PoliciesSpec.with_vhost do |vhost| vhost.queues["test1"] = LavinMQ::Queue.new(vhost, "test") vhost.add_policy("test", "^.*$", "all", definitions, -10_i8) - sleep 0.01 + sleep 10.milliseconds vhost.queues["test1"].policy.try(&.name).should eq "test" vhost.delete_policy("test") - sleep 0.01 + sleep 10.milliseconds vhost.queues["test1"].policy.should be_nil end end @@ -59,7 +59,7 @@ describe LavinMQ::VHost do defs = {"max-length" => JSON::Any.new(1_i64)} of String => JSON::Any vhost.queues["test"] = LavinMQ::Queue.new(vhost, "test") vhost.add_policy("ml", "^.*$", "queues", defs, 11_i8) - sleep 0.01 + sleep 10.milliseconds vhost.queues["test"].policy.not_nil!.name.should eq "ml" end end @@ -70,7 +70,7 @@ describe LavinMQ::VHost do vhost.queues["test2"] = LavinMQ::Queue.new(vhost, "test") vhost.add_policy("ml2", "^.*$", "queues", defs, 1_i8) vhost.add_policy("ml1", "^.*$", "queues", defs, 0_i8) - sleep 0.01 + sleep 10.milliseconds vhost.queues["test2"].policy.not_nil!.name.should eq "ml2" end end @@ -102,7 +102,7 @@ describe LavinMQ::VHost do end ch.queue_declare("policy-ttl", passive: true)[:message_count].should eq 10 s.vhosts["/"].add_policy("ttl", "^.*$", "all", defs, 12_i8) - sleep 0.01 + sleep 10.milliseconds ch.queue_declare("policy-ttl", passive: true)[:message_count].should eq 0 s.vhosts["/"].delete_policy("ttl") end @@ -116,7 +116,7 @@ describe LavinMQ::VHost do q = ch.queue("qttl") q.publish_confirm "" s.vhosts["/"].add_policy("qttl", "^.*$", "all", defs, 12_i8) - sleep 0.01 + sleep 10.milliseconds expect_raises(AMQP::Client::Channel::ClosedException) do ch.queue_declare("qttl", passive: true) end @@ -131,9 +131,9 @@ describe LavinMQ::VHost do ch.queue("qttl") queue = s.vhosts["/"].queues["qttl"] first = queue.last_get_time - sleep 0.1 + sleep 0.1.seconds s.vhosts["/"].add_policy("qttl", "^.*$", "all", defs, 12_i8) - sleep 0.1 + sleep 0.1.seconds last = queue.last_get_time last.should be > first end @@ -149,9 +149,9 @@ describe LavinMQ::VHost do q.publish_confirm "short2" q.publish_confirm "long" ch.queue_declare("max-length-bytes", passive: true)[:message_count].should eq 3 - sleep 0.02 + sleep 0.02.seconds s.vhosts["/"].add_policy("max-length-bytes", "^.*$", "all", defs, 12_i8) - sleep 0.01 + sleep 10.milliseconds ch.queue_declare("max-length-bytes", passive: true)[:message_count].should eq 2 q.get(no_ack: true).try(&.body_io.to_s).should eq("short2") q.get(no_ack: true).try(&.body_io.to_s).should eq("long") @@ -166,7 +166,7 @@ describe LavinMQ::VHost do with_channel(s) do |ch| q = ch.queue("max-length-bytes", exclusive: true) s.vhosts["/"].add_policy("max-length-bytes", "^.*$", "all", defs, 12_i8) - sleep 0.01 + sleep 10.milliseconds q.publish_confirm "short1" q.publish_confirm "short2" q.publish_confirm "long" @@ -185,7 +185,7 @@ describe LavinMQ::VHost do with_channel(s) do |ch| q = ch.queue("max-length-bytes", exclusive: true) s.vhosts["/"].add_policy("max-length-bytes", "^.*$", "all", defs, 12_i8) - sleep 0.01 + sleep 10.milliseconds q.publish_confirm "short1" q.publish_confirm "short2" q.publish_confirm "long" @@ -256,7 +256,7 @@ describe LavinMQ::VHost do "delayed-message"} vhost.queues["test"] = LavinMQ::Queue.new(vhost, "test") vhost.add_policy("test", "^.*$", "all", definitions, -10_i8) - sleep 0.01 + sleep 10.milliseconds vhost.queues["test"].details_tuple[:effective_policy_definition].as(Hash(String, JSON::Any)).each_key do |k| supported_policies.includes?(k).should be_true end @@ -272,11 +272,11 @@ describe LavinMQ::VHost do vhost.exchanges["x-with-ae"] = LavinMQ::DirectExchange.new(vhost, "x-with-ae", arguments: AMQ::Protocol::Table.new({"x-alternate-exchange": "ae2"})) vhost.add_policy("test", ".*", "all", definitions, 100_i8) - sleep 0.01 + sleep 10.milliseconds vhost.exchanges["no-ae"].@alternate_exchange.should eq "dead-letters" vhost.exchanges["x-with-ae"].@alternate_exchange.should eq "ae2" vhost.delete_policy("test") - sleep 0.01 + sleep 10.milliseconds vhost.exchanges["no-ae"].@alternate_exchange.should be_nil vhost.exchanges["x-with-ae"].@alternate_exchange.should eq "ae2" end @@ -287,11 +287,11 @@ describe LavinMQ::VHost do vhost.queues["test1"] = LavinMQ::Queue.new(vhost, "test1", arguments: LavinMQ::AMQP::Table.new({"x-max-length" => 1_i64})) vhost.queues["test2"] = LavinMQ::Queue.new(vhost, "test2", arguments: LavinMQ::AMQP::Table.new({"x-max-length" => 11_i64})) vhost.add_policy("test", ".*", "all", definitions, 100_i8) - sleep 0.01 + sleep 10.milliseconds vhost.queues["test1"].@max_length.should eq 1 vhost.queues["test2"].@max_length.should eq 10 vhost.delete_policy("test") - sleep 0.01 + sleep 10.milliseconds vhost.queues["test1"].@max_length.should eq 1 vhost.queues["test2"].@max_length.should eq 11 end diff --git a/spec/queue_spec.cr b/spec/queue_spec.cr index 013fcd421f..cf8f9399d6 100644 --- a/spec/queue_spec.cr +++ b/spec/queue_spec.cr @@ -276,7 +276,7 @@ describe LavinMQ::Queue do sq = s.vhosts["/"].queues[q.name] sq.unacked_count.should eq 1 msg.not_nil!.ack - sleep 0.01 + sleep 10.milliseconds sq.unacked_count.should eq 0 end end @@ -296,7 +296,7 @@ describe LavinMQ::Queue do sq = s.vhosts["/"].queues[q.name] sq.unacked_count.should eq 1 msg.ack - sleep 0.01 + sleep 10.milliseconds sq.unacked_count.should eq 0 end end @@ -342,7 +342,7 @@ describe LavinMQ::Queue do sub = q.subscribe(no_ack: true) { |_| } Dir.exists?(data_dir).should be_true q.unsubscribe(sub) - sleep 0.1 + sleep 0.1.seconds Dir.exists?(data_dir).should be_false end end diff --git a/spec/server_spec.cr b/spec/server_spec.cr index 0823be8cc1..7139f9146c 100644 --- a/spec/server_spec.cr +++ b/spec/server_spec.cr @@ -140,7 +140,7 @@ describe LavinMQ::Server do q.unbind(x.name, "") x.publish("m1", q.name) code.receive.should eq 404 - sleep 0.01 + sleep 10.milliseconds ch.closed?.should be_true end end @@ -206,9 +206,9 @@ describe LavinMQ::Server do with_channel(s) do |ch| q = ch.queue q.publish_confirm "expired", props: AMQP::Client::Properties.new(expiration: "1") - sleep 0.2 + sleep 0.2.seconds q.publish_confirm "expired", props: AMQP::Client::Properties.new(expiration: "1") - sleep 0.2 + sleep 0.2.seconds msg = q.get(no_ack: true) msg.should be_nil end @@ -363,7 +363,7 @@ describe LavinMQ::Server do ch.prefetch 1 q.subscribe(no_ack: false) do |msg| mch.send msg - sleep 0.2 + sleep 0.2.seconds msg.ack end 10.times do |i| @@ -422,7 +422,7 @@ describe LavinMQ::Server do msgs = [] of AMQP::Client::DeliverMessage tag = q.subscribe { |msg| msgs << msg } q.unsubscribe(tag) - sleep 0.01 + sleep 10.milliseconds ch.has_subscriber?(tag).should eq false end end @@ -462,7 +462,7 @@ describe LavinMQ::Server do x.publish "m2", q.name, props: AMQP::Client::Properties.new(headers: hdrs) msgs = Channel(AMQP::Client::DeliverMessage).new(2) q.subscribe { |msg| msgs.send msg } - spawn { sleep 5; msgs.close } + spawn { sleep 5.seconds; msgs.close } 2.times { msgs.receive?.should_not be_nil } end end @@ -569,7 +569,7 @@ describe LavinMQ::Server do with_amqp_server do |s| with_channel(s) do |ch| ch.basic_ack(0, multiple: true) - sleep 0.01 + sleep 10.milliseconds ch.basic_ack(0, multiple: true) end end @@ -639,7 +639,7 @@ describe LavinMQ::Server do q.publish_confirm("m2").should be_true definitions = {"max-length" => JSON::Any.new(1_i64)} of String => JSON::Any s.vhosts["/"].add_policy("test", "^mlq$", "queues", definitions, 10_i8) - sleep 0.01 + sleep 10.milliseconds s.vhosts["/"].queues["mlq"].message_count.should eq 1 end end @@ -840,7 +840,7 @@ describe LavinMQ::Server do q1.bind(x1.name, "rk") q2.bind(x1.name, "rk") x1.publish("m1", "rk") - sleep 0.05 + sleep 0.05.milliseconds msg_q1 = q1.get(no_ack: true) msg_q2 = q2.get(no_ack: true) msg_q1.not_nil!.body_io.to_s.should eq("m1") @@ -885,11 +885,11 @@ describe LavinMQ::Server do 5.times { q.publish "" } delivered = 0 tag = q.subscribe(no_ack: false) { |_m| delivered += 1 } - sleep 0.05 + sleep 0.05.milliseconds q.unsubscribe(tag) - sleep 0.05 + sleep 0.05.milliseconds ch.basic_recover(requeue: true) - sleep 0.05 + sleep 0.05.milliseconds q.delete[:message_count].should eq 5 end end @@ -1042,7 +1042,7 @@ describe LavinMQ::Server do with_amqp_server do |s| with_channel(s) do |ch| ch.basic_ack(1) - sleep 0.1 + sleep 0.1.seconds expect_raises(AMQP::Client::Channel::ClosedException, /PRECONDITION_FAILED - unknown delivery tag 1/) do ch.basic_ack(1) end @@ -1062,7 +1062,7 @@ describe LavinMQ::Server do _msg3 = ch.basic_get(q.name, no_ack: false) msg2.not_nil!.ack msg2.not_nil!.ack # this will trigger the error - sleep 0.1 + sleep 0.1.seconds expect_raises(AMQP::Client::Channel::ClosedException, /PRECONDITION_FAILED - unknown delivery tag 2/) do msg1.not_nil!.ack end @@ -1082,7 +1082,7 @@ describe LavinMQ::Server do msg3 = ch.basic_get(q.name, no_ack: false) msg2.not_nil!.ack msg2.not_nil!.ack(multiple: true) # this should tigger a precondition fail - sleep 0.1 + sleep 0.1.seconds expect_raises(AMQP::Client::Channel::ClosedException, /PRECONDITION_FAILED - unknown delivery tag 2/) do msg3.not_nil!.ack end @@ -1102,7 +1102,7 @@ describe LavinMQ::Server do end q.publish "1" ch.@connection.@io.as(TCPSocket).close - sleep 0.05 + sleep 0.05.milliseconds count.should eq 0 Fiber.yield diff --git a/spec/shovel_spec.cr b/spec/shovel_spec.cr index a7dd230eeb..e5e1302838 100644 --- a/spec/shovel_spec.cr +++ b/spec/shovel_spec.cr @@ -258,7 +258,7 @@ describe LavinMQ::Shovel do x.publish_confirm "shovel me", "ap_q1" spawn shovel.run wait_for { shovel.running? } - sleep 0.1 # Give time for message to be shoveled + sleep 0.1.seconds # Give time for message to be shoveled s.vhosts["/"].queues["ap_q1"].message_count.should eq 0 q2.get(no_ack: false).try(&.body_io.to_s).should eq "shovel me" end @@ -644,7 +644,7 @@ describe LavinMQ::Shovel do props = AMQP::Client::Properties.new("text/plain", nil, headers) x.publish_confirm "shovel me", "ql_q1", props: props shovel.run - sleep 0.01 + sleep 10.milliseconds # Check that we have sent one message successfully path.should eq "/pp" diff --git a/spec/storage_spec.cr b/spec/storage_spec.cr index 561adc1be2..4f70d17f72 100644 --- a/spec/storage_spec.cr +++ b/spec/storage_spec.cr @@ -33,7 +33,7 @@ describe LavinMQ::DurableQueue do queue = vhost.queues["corrupt_q"].as(LavinMQ::DurableQueue) q.publish_confirm "test message" - sleep 0.01 + sleep 10.milliseconds bytes = "111111111aaaaauaoeuaoeu".to_slice queue.@msg_store.@segments.each_value do |mfile| File.open(mfile.path, "w+") do |f| diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index cb4ad7eb02..35aec89de1 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -112,7 +112,7 @@ describe LavinMQ::StreamQueue do q = ch.queue("stream-max-age", args: AMQP::Client::Arguments.new(args)) data = Bytes.new(LavinMQ::Config.instance.segment_size) 2.times { q.publish_confirm data } - sleep 1.1 + sleep 1.1.seconds q.publish_confirm data q.message_count.should eq 1 end @@ -127,7 +127,7 @@ describe LavinMQ::StreamQueue do q = ch.queue("stream-max-age-policy", args: AMQP::Client::Arguments.new(args)) data = Bytes.new(LavinMQ::Config.instance.segment_size) 2.times { q.publish_confirm data } - sleep 1.1 + sleep 1.1.seconds q.publish_confirm data q.message_count.should eq 1 end @@ -142,7 +142,7 @@ describe LavinMQ::StreamQueue do data = Bytes.new(LavinMQ::Config.instance.segment_size) 2.times { q.publish_confirm data } q.message_count.should eq 2 - sleep 1.1 + sleep 1.1.seconds s.vhosts["/"].add_policy("max", "stream-max-age-policy", "queues", {"max-age" => JSON::Any.new("1s")}, 0i8) q.message_count.should eq 1 end diff --git a/spec/unix_server_spec.cr b/spec/unix_server_spec.cr index 20f41ff03d..cf278aa8f4 100644 --- a/spec/unix_server_spec.cr +++ b/spec/unix_server_spec.cr @@ -4,7 +4,7 @@ describe LavinMQ::Server do describe "UNIX Sockets" do pending "can accept UNIX socket connections" do spawn { s.listen_unix("/tmp/lavinmq-spec/lavinmq.sock") } - sleep 0.01 + sleep 10.milliseconds with_channel(host: "/tmp/lavinmq-spec/lavinmq.sock") do |ch| ch.should_not be_nil end diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 95ca48108f..b86a7fc53c 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -246,7 +246,7 @@ describe LavinMQ::Federation::Upstream do end upstream_ex.publish_confirm "msg1", "rk1" msgs.receive.should eq "msg1" - sleep 0.01 # allow the downstream federation to ack the msg + sleep 10.milliseconds # allow the downstream federation to ack the msg upstream_vhost.connections.each do |conn| next unless conn.client_name.starts_with?("Federation link") conn.close @@ -410,10 +410,10 @@ describe LavinMQ::Federation::Upstream do downstream_q.bind("downstream_ex", "after.link.#{i}") queues << downstream_q end - sleep 0.1 + sleep 0.1.seconds upstream_q.bindings.size.should eq queues.size queues.each &.delete - sleep 0.01 + sleep 10.milliseconds upstream_q.bindings.size.should eq 0 end end diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index 2bb4a2a4d9..e088bbd9b7 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -81,7 +81,7 @@ module LavinMQ socket.try &.close break if @closed Log.info { "Disconnected from server #{host}:#{port} (#{ex}), retrying..." } - sleep 1 + sleep 1.seconds end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index eb837dd084..3c3609b895 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -224,11 +224,11 @@ module LavinMQ rescue ex : Error Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" } socket.close rescue nil - sleep 0.1 + sleep 0.1.seconds rescue IO::Error Log.warn { "Lost connection to #{address}, retrying" } socket.close rescue nil - sleep 0.1 + sleep 0.1.seconds ensure @connections.push({socket, address}) unless socket.closed? end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 4e170e6859..9ffb8e27d6 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -51,7 +51,7 @@ module LavinMQ GC.collect end else - sleep 30 + sleep 30.seconds @data_dir_lock.try &.poll GC.collect end diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 7fb74e20fe..54a8876a60 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -148,7 +148,7 @@ module LavinMQ private def unmap_and_remove_segments_loop until closed? - sleep 60 + sleep 60.seconds unmap_and_remove_segments end end diff --git a/src/lavinmq/rough_time.cr b/src/lavinmq/rough_time.cr index 8bc00c54b7..f940464598 100644 --- a/src/lavinmq/rough_time.cr +++ b/src/lavinmq/rough_time.cr @@ -5,7 +5,7 @@ module RoughTime spawn(name: "RoughTime") do loop do - sleep 0.1 + sleep 0.1.seconds @@utc = Time.utc @@unix_ms = @@utc.to_unix_ms // 100 * 100 @@monotonic = Time.monotonic diff --git a/src/lavinmq/shovel/shovel.cr b/src/lavinmq/shovel/shovel.cr index 1fd6e4e592..e478cceb1e 100644 --- a/src/lavinmq/shovel/shovel.cr +++ b/src/lavinmq/shovel/shovel.cr @@ -9,7 +9,7 @@ module LavinMQ DEFAULT_ACK_MODE = AckMode::OnConfirm DEFAULT_DELETE_AFTER = DeleteAfter::Never DEFAULT_PREFETCH = 1000_u16 - DEFAULT_RECONNECT_DELAY = 5 + DEFAULT_RECONNECT_DELAY = 5.seconds DEFAULT_BATCH_ACK_TIMEOUT = 3.seconds enum State @@ -373,7 +373,7 @@ module LavinMQ getter name, vhost def initialize(@source : AMQPSource, @destination : Destination, - @name : String, @vhost : VHost, @reconnect_delay = DEFAULT_RECONNECT_DELAY) + @name : String, @vhost : VHost, @reconnect_delay : Time::Span = DEFAULT_RECONNECT_DELAY) end def state @@ -427,7 +427,7 @@ module LavinMQ def exponential_reconnect_delay @retries += 1 if @retries > RETRY_THRESHOLD - sleep Math.min(MAX_DELAY, @reconnect_delay ** (@retries - RETRY_THRESHOLD)) + sleep Math.min(MAX_DELAY, @reconnect_delay.seconds ** (@retries - RETRY_THRESHOLD)).seconds else sleep @reconnect_delay end diff --git a/src/lavinmq/shovel/shovel_store.cr b/src/lavinmq/shovel/shovel_store.cr index 42bdb2c36f..f69f40bbb5 100644 --- a/src/lavinmq/shovel/shovel_store.cr +++ b/src/lavinmq/shovel/shovel_store.cr @@ -21,7 +21,7 @@ module LavinMQ delete_after = Shovel::DeleteAfter.parse?(delete_after_str) || Shovel::DEFAULT_DELETE_AFTER ack_mode_str = config["ack-mode"]?.try(&.as_s.delete("-")).to_s ack_mode = Shovel::AckMode.parse?(ack_mode_str) || Shovel::DEFAULT_ACK_MODE - reconnect_delay = config["reconnect-delay"]?.try &.as_i || Shovel::DEFAULT_RECONNECT_DELAY + reconnect_delay = config["reconnect-delay"]?.try &.as_i.seconds || Shovel::DEFAULT_RECONNECT_DELAY prefetch = config["src-prefetch-count"]?.try(&.as_i.to_u16) || Shovel::DEFAULT_PREFETCH src = Shovel::AMQPSource.new(name, parse_uris(config["src-uri"]), config["src-queue"]?.try &.as_s?, diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index f89d5d1831..9db3c96dfc 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -63,7 +63,7 @@ module LavinMQ private def check_consumer_timeouts_loop loop do - sleep Config.instance.consumer_timeout_loop_interval + sleep Config.instance.consumer_timeout_loop_interval.seconds return if @closed @connections.each do |c| c.channels.each_value do |ch| @@ -457,7 +457,7 @@ module LavinMQ # wait up to 10s for clients to gracefully close 100.times do break if @connections.empty? - sleep 0.1 + sleep 0.1.seconds end # then force close the remaining (close tcp socket) @connections.each &.force_close @@ -508,7 +508,7 @@ module LavinMQ private def load! load_definitions! spawn(name: "Load parameters") do - sleep 0.01 + sleep 10.milliseconds next if @closed apply_parameters apply_policies diff --git a/src/lavinmqperf.cr b/src/lavinmqperf.cr index 77f3821791..0200417d99 100644 --- a/src/lavinmqperf.cr +++ b/src/lavinmqperf.cr @@ -183,7 +183,7 @@ class Throughput < Perf break if @stopped pubs_last = @pubs consumes_last = @consumes - sleep 1 + sleep 1.seconds unless @quiet puts "Publish rate: #{@pubs - pubs_last} msgs/s Consume rate: #{@consumes - consumes_last} msgs/s" end @@ -334,7 +334,7 @@ class Throughput < Perf break yield rescue ex : AMQP::Client::Error | IO::Error puts ex.message - sleep 1 + sleep 1.seconds end ensure done.done