Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a waitgroup to wait for reserve holders of MFiles before unmapping #876

Merged
merged 7 commits into from
Jan 9, 2025
24 changes: 14 additions & 10 deletions spec/schema_version_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,30 @@ describe LavinMQ::SchemaVersion do
it "Empty file should raise IO::EOFError" do
with_datadir do |data_dir|
path = File.join(data_dir, "test_schema_version")
file = MFile.new(path, 12)
expect_raises(IO::EOFError) do
LavinMQ::SchemaVersion.verify(file, :message)
MFile.open(path, 12) do |file|
expect_raises(IO::EOFError) do
LavinMQ::SchemaVersion.verify(file, :message)
end
end
end
end

it "Should verify schema version" do
with_datadir do |data_dir|
path = File.join(data_dir, "test_schema_version")
file = MFile.new(path, 12)
file.write_bytes LavinMQ::Schema::VERSION
LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message]
MFile.open(path, 12) do |file|
file.write_bytes LavinMQ::Schema::VERSION
LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message]
end
end
end

it "Deletes empty file and creates a new when it is the first file" do
with_datadir do |data_dir|
path = File.join(data_dir, "msgs.0000000001")
file = MFile.new(path, LavinMQ::Config.instance.segment_size)
file.resize(LavinMQ::Config.instance.segment_size)
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil)
[email protected]_value.size.should eq 4
Expand All @@ -39,8 +42,9 @@ describe LavinMQ::SchemaVersion do
v.declare_queue("q", true, false)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@queue_data_dir
path = File.join(data_dir, "msgs.0000000002")
file = MFile.new(path, LavinMQ::Config.instance.segment_size)
file.resize(LavinMQ::Config.instance.segment_size)
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil)
[email protected] eq 1
Expand Down
8 changes: 2 additions & 6 deletions src/lavinmq/amqp/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ module LavinMQ
bytesize = BytesMessage.skip(mfile)
count += 1
next if deleted?(seg, pos)
update_stats_per_msg(seg, ts, bytesize)
@bytesize += bytesize
@size += 1
rescue ex : IO::EOFError
break
rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode
Expand All @@ -402,11 +403,6 @@ module LavinMQ
@log.info { "Loaded #{counter} segments, #{@size} messages" }
end

private def update_stats_per_msg(seg, ts, bytesize)
@bytesize += bytesize
@size += 1
end

private def delete_unused_segments : Nil
current_seg = @segments.last_key
@segments.reject! do |seg, mfile|
Expand Down
3 changes: 0 additions & 3 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,6 @@ module LavinMQ::AMQP
delete
else
notify_consumers_empty(true)
@msg_store_lock.synchronize do
@msg_store.unmap_segments
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/amqp/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ module LavinMQ::AMQP
end

private def unmap_and_remove_segments_loop
sleep rand(60).seconds
until closed?
sleep 60.seconds
unmap_and_remove_segments
Expand Down
24 changes: 24 additions & 0 deletions src/lavinmq/clustering/actions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module LavinMQ

abstract def lag_size : Int64
abstract def send(socket : IO, log = Log) : Int64
abstract def done

getter filename

Expand All @@ -25,6 +26,7 @@ module LavinMQ

struct AddAction < Action
def initialize(@data_dir : String, @filename : String, @mfile : MFile? = nil)
@mfile.try &.reserve
end

def lag_size : Int64
Expand Down Expand Up @@ -53,11 +55,22 @@ module LavinMQ
size
end
end
ensure
done
end

def done
if mfile = @mfile
mfile.unreserve
end
end
end

struct AppendAction < Action
def initialize(@data_dir : String, @filename : String, @obj : Bytes | FileRange | UInt32 | Int32)
if range = @obj.as?(FileRange)
range.mfile.reserve
end
end

def lag_size : Int64
Expand Down Expand Up @@ -92,6 +105,14 @@ module LavinMQ
end
log.debug { "Append #{len} bytes to #{@filename}" }
len
ensure
done
end

def done
if fr = @obj.as?(FileRange)
fr.mfile.unreserve
end
end
end

Expand All @@ -109,6 +130,9 @@ module LavinMQ
socket.write_bytes 0i64
0i64
end

def done
end
end
end
end
38 changes: 17 additions & 21 deletions src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "./actions"
require "./file_index"
require "../config"
require "socket"
require "wait_group"

module LavinMQ
module Clustering
Expand All @@ -11,7 +12,7 @@ module LavinMQ
@acked_bytes = 0_i64
@sent_bytes = 0_i64
@actions = Channel(Action).new(Config.instance.clustering_max_unsynced_actions)
@closed = false
@running = WaitGroup.new
getter id = -1
getter remote_address

Expand Down Expand Up @@ -46,6 +47,7 @@ module LavinMQ
end

def action_loop(lz4 = @lz4)
@running.add
while action = @actions.receive?
action.send(lz4, Log)
sent_bytes = action.lag_size.to_i64
Expand All @@ -57,12 +59,7 @@ module LavinMQ
sync(sent_bytes)
end
ensure
begin
@lz4.close
@socket.close
rescue IO::Error
# ignore connection errors while closing
end
@running.done
end

private def sync(bytes, socket = @socket) : Nil
Expand All @@ -74,9 +71,6 @@ module LavinMQ
private def read_ack(socket = @socket) : Int64
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
if @closed && lag_in_bytes.zero?
@closed_and_in_sync.close
end
len
end

Expand Down Expand Up @@ -171,18 +165,20 @@ module LavinMQ
lag_size
end

@closed_and_in_sync = Channel(Nil).new

def close(timeout : Time::Span = 30.seconds)
@closed = true
def close
@actions.close
if lag_in_bytes > 0
Log.info { "Waiting for follower to be in sync" }
select
when @closed_and_in_sync.receive?
when timeout(timeout)
Log.warn { "Timeout waiting for follower to be in sync" }
end
@running.wait # let action_loop finish

# abort remaining actions (unmap pending files)
while action = @actions.receive?
action.done
end

begin
@lz4.close
@socket.close
rescue IO::Error
# ignore connection errors while closing
end
end

Expand Down
10 changes: 2 additions & 8 deletions src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,8 @@ module LavinMQ
def files_with_hash(& : Tuple(String, Bytes) -> Nil)
sha1 = Digest::SHA1.new
hash = Bytes.new(sha1.digest_size)
@files.each do |path, mfile|
if mfile
was_unmapped = mfile.unmapped?
sha1.update mfile.to_slice
mfile.unmap if was_unmapped
else
sha1.file path
end
@files.each_key do |path|
sha1.file path
sha1.final hash
sha1.reset
yield({path, hash})
Expand Down
11 changes: 0 additions & 11 deletions src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,6 @@ module LavinMQ
end

private def run_gc
STDOUT.puts "Unmapping all segments"
STDOUT.flush
@amqp_server.vhosts.each_value do |vhost|
vhost.queues.each_value do |q|
if q = q.as(LavinMQ::AMQP::Queue)
msg_store = q.@msg_store
[email protected]_value &.unmap
[email protected]_value &.unmap
end
end
end
STDOUT.puts "Garbage collecting"
STDOUT.flush
GC.collect
Expand Down
40 changes: 30 additions & 10 deletions src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "wait_group"

lib LibC
MS_ASYNC = 1
MREMAP_MAYMOVE = 1
Expand Down Expand Up @@ -31,6 +33,7 @@ class MFile < IO
getter fd : Int32
@buffer = Pointer(UInt8).null
@deleted = false
@wg = WaitGroup.new

# Map a file, if no capacity is given the file must exists and
# the file will be mapped as readonly
Expand All @@ -47,9 +50,8 @@ class MFile < IO
end
end

# Opens an existing file in readonly mode
def self.open(path, & : self -> _)
mfile = self.new(path)
def self.open(path, capacity : Int? = nil, writeonly = false, & : self -> _)
mfile = self.new(path, capacity, writeonly)
begin
yield mfile
ensure
Expand Down Expand Up @@ -100,13 +102,22 @@ class MFile < IO
self
end

def reserve
@wg.add
end

def unreserve
@wg.done
end

# The file will be truncated to the current position unless readonly or deleted
def close(truncate_to_size = true)
# unmap occurs on finalize
if truncate_to_size && !@readonly && !@deleted && @fd > 0
code = LibC.ftruncate(@fd, @size)
raise File::Error.from_errno("Error truncating file", file: @path) if code < 0
end

unmap
ensure
unless @fd == -1
code = LibC.close(@fd)
Expand All @@ -130,8 +141,22 @@ class MFile < IO

# unload the memory mapping, will be remapped on demand
def unmap : Nil
munmap
# unmap if non has reserved the file, race condition prone?
if @[email protected](:acquire).zero?
unsafe_unmap
else
spawn(name: "munmap #{@path}") do
@wg.wait
unsafe_unmap
end
end
end

private def unsafe_unmap : Nil
b = @buffer
c = @capacity
@buffer = Pointer(UInt8).null
munmap(b, c)
end

def unmapped? : Bool
Expand Down Expand Up @@ -170,11 +195,6 @@ class MFile < IO
raise RuntimeError.from_errno("msync") if code < 0
end

def finalize
LibC.close(@fd) if @fd > -1
LibC.munmap(@buffer, @capacity) unless @buffer.null?
end

def write(slice : Bytes) : Nil
size = @size
new_size = size + slice.size
Expand Down
Loading