From 6db1d4ee233824a7f50a50815d9067cc2cd0806d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 11 Dec 2024 16:22:58 +0100 Subject: [PATCH 1/7] Use a WaitGroup to wait for Followers before unmapping MFiles --- spec/schema_version_spec.cr | 24 +++++++++++-------- src/lavinmq/clustering/actions.cr | 24 +++++++++++++++++++ src/lavinmq/clustering/follower.cr | 38 +++++++++++++----------------- src/lavinmq/mfile.cr | 31 ++++++++++++++++++------ 4 files changed, 79 insertions(+), 38 deletions(-) diff --git a/spec/schema_version_spec.cr b/spec/schema_version_spec.cr index d05462b052..3b09dc675b 100644 --- a/spec/schema_version_spec.cr +++ b/spec/schema_version_spec.cr @@ -6,9 +6,10 @@ describe LavinMQ::SchemaVersion do it "Empty file should raise IO::EOFError" do with_datadir do |data_dir| path = File.join(data_dir, "test_schema_version") - file = MFile.new(path, 12) - expect_raises(IO::EOFError) do - LavinMQ::SchemaVersion.verify(file, :message) + MFile.open(path, 12) do |file| + expect_raises(IO::EOFError) do + LavinMQ::SchemaVersion.verify(file, :message) + end end end end @@ -16,17 +17,19 @@ describe LavinMQ::SchemaVersion do it "Should verify schema version" do with_datadir do |data_dir| path = File.join(data_dir, "test_schema_version") - file = MFile.new(path, 12) - file.write_bytes LavinMQ::Schema::VERSION - LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message] + MFile.open(path, 12) do |file| + file.write_bytes LavinMQ::Schema::VERSION + LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message] + end end end it "Deletes empty file and creates a new when it is the first file" do with_datadir do |data_dir| path = File.join(data_dir, "msgs.0000000001") - file = MFile.new(path, LavinMQ::Config.instance.segment_size) - file.resize(LavinMQ::Config.instance.segment_size) + MFile.open(path, LavinMQ::Config.instance.segment_size) do |file| + file.resize(LavinMQ::Config.instance.segment_size) + end # init new message store msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil) msg_store.@segments.first_value.size.should eq 4 @@ -39,8 +42,9 @@ describe LavinMQ::SchemaVersion do v.declare_queue("q", true, false) data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@queue_data_dir path = File.join(data_dir, "msgs.0000000002") - file = MFile.new(path, LavinMQ::Config.instance.segment_size) - file.resize(LavinMQ::Config.instance.segment_size) + MFile.open(path, LavinMQ::Config.instance.segment_size) do |file| + file.resize(LavinMQ::Config.instance.segment_size) + end # init new message store msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil) msg_store.@segments.size.should eq 1 diff --git a/src/lavinmq/clustering/actions.cr b/src/lavinmq/clustering/actions.cr index 2062573343..a65a87baac 100644 --- a/src/lavinmq/clustering/actions.cr +++ b/src/lavinmq/clustering/actions.cr @@ -14,6 +14,7 @@ module LavinMQ abstract def lag_size : Int64 abstract def send(socket : IO, log = Log) : Int64 + abstract def done getter filename @@ -25,6 +26,7 @@ module LavinMQ struct AddAction < Action def initialize(@data_dir : String, @filename : String, @mfile : MFile? = nil) + @mfile.try &.reserve end def lag_size : Int64 @@ -53,11 +55,22 @@ module LavinMQ size end end + ensure + done + end + + def done + if mfile = @mfile + mfile.unreserve + end end end struct AppendAction < Action def initialize(@data_dir : String, @filename : String, @obj : Bytes | FileRange | UInt32 | Int32) + if range = @obj.as?(FileRange) + range.mfile.reserve + end end def lag_size : Int64 @@ -92,6 +105,14 @@ module LavinMQ end log.debug { "Append #{len} bytes to #{@filename}" } len + ensure + done + end + + def done + if fr = @obj.as?(FileRange) + fr.mfile.unreserve + end end end @@ -109,6 +130,9 @@ module LavinMQ socket.write_bytes 0i64 0i64 end + + def done + end end end end diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index 1b47a4568d..e06a831fdb 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -2,6 +2,7 @@ require "./actions" require "./file_index" require "../config" require "socket" +require "wait_group" module LavinMQ module Clustering @@ -11,7 +12,7 @@ module LavinMQ @acked_bytes = 0_i64 @sent_bytes = 0_i64 @actions = Channel(Action).new(Config.instance.clustering_max_unsynced_actions) - @closed = false + @running = WaitGroup.new getter id = -1 getter remote_address @@ -46,6 +47,7 @@ module LavinMQ end def action_loop(lz4 = @lz4) + @running.add while action = @actions.receive? action.send(lz4, Log) sent_bytes = action.lag_size.to_i64 @@ -57,12 +59,7 @@ module LavinMQ sync(sent_bytes) end ensure - begin - @lz4.close - @socket.close - rescue IO::Error - # ignore connection errors while closing - end + @running.done end private def sync(bytes, socket = @socket) : Nil @@ -74,9 +71,6 @@ module LavinMQ private def read_ack(socket = @socket) : Int64 len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len - if @closed && lag_in_bytes.zero? - @closed_and_in_sync.close - end len end @@ -171,18 +165,20 @@ module LavinMQ lag_size end - @closed_and_in_sync = Channel(Nil).new - - def close(timeout : Time::Span = 30.seconds) - @closed = true + def close @actions.close - if lag_in_bytes > 0 - Log.info { "Waiting for follower to be in sync" } - select - when @closed_and_in_sync.receive? - when timeout(timeout) - Log.warn { "Timeout waiting for follower to be in sync" } - end + @running.wait # let action_loop finish + + # abort remaining actions (unmap pending files) + while action = @actions.receive? + action.done + end + + begin + @lz4.close + @socket.close + rescue IO::Error + # ignore connection errors while closing end end diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index bc2ae90af3..e1dfc55001 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -1,3 +1,5 @@ +require "wait_group" + lib LibC MS_ASYNC = 1 MREMAP_MAYMOVE = 1 @@ -31,6 +33,7 @@ class MFile < IO getter fd : Int32 @buffer = Pointer(UInt8).null @deleted = false + @wg = WaitGroup.new # Map a file, if no capacity is given the file must exists and # the file will be mapped as readonly @@ -100,13 +103,30 @@ class MFile < IO self end + def reserve + @wg.add + end + + def unreserve + @wg.done + end + # The file will be truncated to the current position unless readonly or deleted def close(truncate_to_size = true) - # unmap occurs on finalize if truncate_to_size && !@readonly && !@deleted && @fd > 0 code = LibC.ftruncate(@fd, @size) raise File::Error.from_errno("Error truncating file", file: @path) if code < 0 end + + # unmap if non has reserved the file, race condition prone? + if @wg.@counter.get(:acquire).zero? + unmap + else + spawn(name: "munmap #{@path}") do + @wg.wait + unmap + end + end ensure unless @fd == -1 code = LibC.close(@fd) @@ -130,8 +150,10 @@ class MFile < IO # unload the memory mapping, will be remapped on demand def unmap : Nil - munmap + b = @buffer + c = @capacity @buffer = Pointer(UInt8).null + munmap(b, c) end def unmapped? : Bool @@ -170,11 +192,6 @@ class MFile < IO raise RuntimeError.from_errno("msync") if code < 0 end - def finalize - LibC.close(@fd) if @fd > -1 - LibC.munmap(@buffer, @capacity) unless @buffer.null? - end - def write(slice : Bytes) : Nil size = @size new_size = size + slice.size From 8ba9bafa04bd50fa9146f3b16c08b4077160f512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Mon, 16 Dec 2024 23:37:49 +0100 Subject: [PATCH 2/7] Don't unmap on USR2 or last consumer disconnected Can cause seg faults if a follower is still trying to replicate it. --- src/lavinmq/amqp/queue/queue.cr | 3 --- src/lavinmq/launcher.cr | 11 ----------- 2 files changed, 14 deletions(-) diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index 06e8bae7c7..eeee78c8b0 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -854,9 +854,6 @@ module LavinMQ::AMQP delete else notify_consumers_empty(true) - @msg_store_lock.synchronize do - @msg_store.unmap_segments - end end end end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 51e4671524..d4e1094c76 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -167,17 +167,6 @@ module LavinMQ end private def run_gc - STDOUT.puts "Unmapping all segments" - STDOUT.flush - @amqp_server.vhosts.each_value do |vhost| - vhost.queues.each_value do |q| - if q = q.as(LavinMQ::AMQP::Queue) - msg_store = q.@msg_store - msg_store.@segments.each_value &.unmap - msg_store.@acks.each_value &.unmap - end - end - end STDOUT.puts "Garbage collecting" STDOUT.flush GC.collect From e13e7af6a46ef75646184e4f3128ad562d562412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 17 Dec 2024 01:30:36 +0100 Subject: [PATCH 3/7] Spread out stream queue's GC-loop over time Add a random delay to the stream queue GC loops, so that not all loops are executing at the same time. --- src/lavinmq/amqp/queue/stream_queue.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lavinmq/amqp/queue/stream_queue.cr b/src/lavinmq/amqp/queue/stream_queue.cr index 165f809822..5b803debe9 100644 --- a/src/lavinmq/amqp/queue/stream_queue.cr +++ b/src/lavinmq/amqp/queue/stream_queue.cr @@ -152,6 +152,7 @@ module LavinMQ::AMQP end private def unmap_and_remove_segments_loop + sleep rand(60).seconds until closed? sleep 60.seconds unmap_and_remove_segments From 763f72ec7a92f3bd849c7c74173f79b74cbf7335 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 7 Jan 2025 23:36:17 +0100 Subject: [PATCH 4/7] always open file when hashing it for replication comparision To mmap a file and then calcululating the hash of it has a negligible performance benefit. --- src/lavinmq/clustering/server.cr | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/lavinmq/clustering/server.cr b/src/lavinmq/clustering/server.cr index 1719b3698e..7834b28cfe 100644 --- a/src/lavinmq/clustering/server.cr +++ b/src/lavinmq/clustering/server.cr @@ -74,14 +74,8 @@ module LavinMQ def files_with_hash(& : Tuple(String, Bytes) -> Nil) sha1 = Digest::SHA1.new hash = Bytes.new(sha1.digest_size) - @files.each do |path, mfile| - if mfile - was_unmapped = mfile.unmapped? - sha1.update mfile.to_slice - mfile.unmap if was_unmapped - else - sha1.file path - end + @files.each_key do |path| + sha1.file path sha1.final hash sha1.reset yield({path, hash}) From cfdda49635a52651a5e8e2b833a89232629cbdb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 7 Jan 2025 23:37:57 +0100 Subject: [PATCH 5/7] Remove left over method --- src/lavinmq/amqp/queue/message_store.cr | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index 809b240713..801db8d211 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -382,7 +382,8 @@ module LavinMQ bytesize = BytesMessage.skip(mfile) count += 1 next if deleted?(seg, pos) - update_stats_per_msg(seg, ts, bytesize) + @bytesize += bytesize + @size += 1 rescue ex : IO::EOFError break rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode @@ -402,11 +403,6 @@ module LavinMQ @log.info { "Loaded #{counter} segments, #{@size} messages" } end - private def update_stats_per_msg(seg, ts, bytesize) - @bytesize += bytesize - @size += 1 - end - private def delete_unused_segments : Nil current_seg = @segments.last_key @segments.reject! do |seg, mfile| From 9a689c7f853be4e17f6f0356cea2386340078201 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 7 Jan 2025 23:38:51 +0100 Subject: [PATCH 6/7] Introduce an unsafe_unmap, make default unmap method safe --- src/lavinmq/mfile.cr | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index e1dfc55001..73b4988780 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -118,15 +118,7 @@ class MFile < IO raise File::Error.from_errno("Error truncating file", file: @path) if code < 0 end - # unmap if non has reserved the file, race condition prone? - if @wg.@counter.get(:acquire).zero? - unmap - else - spawn(name: "munmap #{@path}") do - @wg.wait - unmap - end - end + unmap ensure unless @fd == -1 code = LibC.close(@fd) @@ -150,6 +142,18 @@ class MFile < IO # unload the memory mapping, will be remapped on demand def unmap : Nil + # unmap if non has reserved the file, race condition prone? + if @wg.@counter.get(:acquire).zero? + unsafe_unmap + else + spawn(name: "munmap #{@path}") do + @wg.wait + unsafe_unmap + end + end + end + + private def unsafe_unmap : Nil b = @buffer c = @capacity @buffer = Pointer(UInt8).null From 6d8b4680e3c6808a41284342064680c286f1fb1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 8 Jan 2025 00:02:13 +0100 Subject: [PATCH 7/7] MFile.open takes the same arguments as MFile#new --- src/lavinmq/mfile.cr | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index 73b4988780..8101b7d5c3 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -50,9 +50,8 @@ class MFile < IO end end - # Opens an existing file in readonly mode - def self.open(path, & : self -> _) - mfile = self.new(path) + def self.open(path, capacity : Int? = nil, writeonly = false, & : self -> _) + mfile = self.new(path, capacity, writeonly) begin yield mfile ensure