Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a waitgroup to wait for reserve holders of MFiles before unmapping #876

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

carlhoerberg
Copy link
Member

WHAT is this pull request doing?

Please fill me in.

HOW can this pull request be tested?

Specs? Manual steps? Please fill me in.

if truncate_to_size && !@readonly && !@deleted && @fd > 0
code = LibC.ftruncate(@fd, @size)
raise File::Error.from_errno("Error truncating file", file: @path) if code < 0
end

# unmap if non has reserved the file, race condition prone?
if @[email protected](:acquire).zero?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're in #close I guess we can be sure here that counter won't be increased, only decreased? So no risk that counter is zero in this check but non-zero when unmap is called?

What about always spawning instead of this if-statement? It would be nice to not have to access an instance variable. Or maybe monkey patch WaitGroup with a done? method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"risk" that it is zero? the optimization here was for that fact that it will always be zero for single node clusters and for transient queues etc, and then we don't need to spawn a fiber.

could monkey patch though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if it's zero when we do the check, but it has changed when we call unmap so we're unmapping even though it's reserved. However, I think we can assume that the counter will never be increased at this point since we're in #close so the MFile shouldn't be used anymore?

@@ -14,6 +14,7 @@ module LavinMQ

abstract def lag_size : Int64
abstract def send(socket : IO, log = Log) : Int64
abstract def abort
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about naming it done and call it from the action loop when an Action has been sent, instead of assuming that send will be called only once for an Action and call unreserve from inside send?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put done in ensure in the Action#send method instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the assumption is that Action#send is only called once

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of assuming that, which would be an internal implementation detail the caller may need to know about, I think it would make sense to do

action.send(lz4, Log)
action.done

in the action loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets ugly because you have to have a begin/recue block, and then you have two loops, so two blocks on top of that

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True

@spuun
Copy link
Member

spuun commented Dec 13, 2024

Two things that we probably must remove

  • unmap from Queue#rm_consumer
  • unmap from USR2 handler

probably wont do much against race conditions, but at least in theory it
could be better
@carlhoerberg
Copy link
Member Author

Two things that we probably must remove
* unmap from Queue#rm_consumer
* unmap from USR2 handler

I don't know, both call MessageStore#unmap_segments, which doesn't unmap the last segment that is still being written to.

def unmap_segments(except : Enumerable(UInt32) = StaticArray(UInt32, 0).new(0u32))
@segments.each do |seg_id, mfile|
next if mfile == @wfile
next if except.includes? seg_id
mfile.unmap
end
end

@spuun
Copy link
Member

spuun commented Dec 16, 2024

I don't know, both call MessageStore#unmap_segments, which doesn't unmap the last segment that is still being written to.

Isn't it possible that a segment that isn't yet fully replicated is unmapped? If it's a large message I think the unmap may occur in the middle of the lz4 compression.

Will these unmaps be necessary if we handle unmapping properly, which this PR hopefully solves?

@carlhoerberg
Copy link
Member Author

I don't know, both call MessageStore#unmap_segments, which doesn't unmap the last segment that is still being written to.

Isn't it possible that a segment that isn't yet fully replicated is unmapped? If it's a large message I think the unmap may occur in the middle of the lz4 compression.

Will these unmaps be necessary if we handle unmapping properly, which this PR hopefully solves?

True, there's a chance for that, if the unmap is triggered just when we're rolling over to a new segment.

And yes, shouldn't be needed anymore.

@carlhoerberg
Copy link
Member Author

also started to tinker with a ref counter as you did, but yes, it fast gets messy, as we return a BytesMessage from MessageStore, but it points into the mmaping.. so we don't know when the client has sent them..

class MFile
  @counter = 0
  @counter_lock = Mutex.new(:unchecked)

  def borrow : self
    @counter_lock.synchronize do
      counter = @counter
      if counter.zero?
        mmap
      end
      @counter = counter + 1
    end
    self
  end

  def unborrow : Nil
    @counter_lock.synchronize do
      counter = @counter -= 1
      if counter.zero?
        unmap
      end
    end
  end
end

@carlhoerberg
Copy link
Member Author

I'm starting to long for the io_uring implementation, then we could skip mmap:ings all together..

@spuun
Copy link
Member

spuun commented Dec 16, 2024

I'm starting to long for the io_uring implementation, then we could skip mmap:ings all together..

Yes! But I think this PR is working pretty well? I've done some runs without crashes.

Can cause seg faults if a follower is still trying
to replicate it.
So that the errors can be caught in specs.
In lavinmq.cr is the exit done
Add a random delay to the stream queue GC loops,
so that not all loops are executing at the same time.
@carlhoerberg carlhoerberg marked this pull request as ready for review December 23, 2024 23:08
@carlhoerberg carlhoerberg requested a review from a team as a code owner December 23, 2024 23:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants