diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index cb4ad7eb0..5c627f36a 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -1,6 +1,38 @@ require "./spec_helper" require "./../src/lavinmq/queue" +module StreamQueueSpecHelpers + def self.publish(s, queue_name, nr_of_messages) + args = {"x-queue-type": "stream"} + with_channel(s) do |ch| + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + nr_of_messages.times { |i| q.publish "m#{i}" } + end + end + + def self.consume_one(s, queue_name, c_tag, c_args = AMQP::Client::Arguments.new) + args = {"x-queue-type": "stream"} + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + msgs = Channel(AMQP::Client::DeliverMessage).new + q.subscribe(no_ack: false, tag: c_tag, args: c_args) do |msg| + msgs.send msg + msg.ack + end + msgs.receive + end + end + + def self.offset_from_headers(headers) + if headers + headers["x-stream-offset"].as(Int64) + else + fail("No headers found") + end + end +end + describe LavinMQ::StreamQueue do stream_queue_args = LavinMQ::AMQP::Table.new({"x-queue-type": "stream"}) @@ -203,4 +235,274 @@ describe LavinMQ::StreamQueue do end end end + + describe "Automatic consumer offset tracking" do + it "resumes from last offset on reconnect" do + queue_name = Random::Secure.hex + consumer_tag = Random::Secure.hex + offset = 3 + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, offset + 1) + offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) } + sleep 0.1 + + # consume again, should start from last offset automatically + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq offset + 1 + end + end + + it "reads offsets from file on init" do + queue_name = Random::Secure.hex + offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64] + tag_prefix = "ctag-" + + with_amqp_server do |s| + vhost = s.vhosts["/"] + StreamQueueSpecHelpers.publish(s, queue_name, 1) + + data_dir = File.join(vhost.data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.store_consumer_offset(tag_prefix + i.to_s, offset) + end + msg_store.close + sleep 0.1 + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.last_offset_by_consumer_tag(tag_prefix + i.to_s).should eq offset + end + msg_store.close + end + end + + it "appends consumer tag file" do + queue_name = Random::Secure.hex + offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64] + consumer_tag = "ctag-1" + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each do |offset| + msg_store.store_consumer_offset(consumer_tag, offset) + end + bytesize = consumer_tag.bytesize + 1 + 8 + msg_store.@consumer_offsets.size.should eq bytesize*5 + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last + msg_store.close + end + end + + it "compacts consumer tag file on restart" do + queue_name = Random::Secure.hex + offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64] + consumer_tag = "ctag-1" + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each do |offset| + msg_store.store_consumer_offset(consumer_tag, offset) + end + msg_store.close + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets.last + bytesize = consumer_tag.bytesize + 1 + 8 + msg_store.@consumer_offsets.size.should eq bytesize + msg_store.close + end + end + + it "compacts consumer tag file when full" do + queue_name = Random::Secure.hex + offsets = [84_i64, 24_i64, 1_i64, 100_i64, 42_i64] + consumer_tag = Random::Secure.hex(32) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + bytesize = consumer_tag.bytesize + 1 + 8 + + offsets = (LavinMQ::Config.instance.segment_size / bytesize).to_i32 + 1 + offsets.times do |i| + msg_store.store_consumer_offset(consumer_tag, i) + end + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq offsets - 1 + msg_store.@consumer_offsets.size.should eq bytesize*2 + msg_store.close + end + end + + it "does not track offset if x-stream-offset is set" do + queue_name = Random::Secure.hex + consumer_tag = Random::Secure.hex + c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0}) + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 2) + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 + sleep 0.1 + + # should consume the same message again since tracking was not saved from last consume + msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) + StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 + end + end + + it "should not use saved offset if x-stream-offset is set" do + queue_name = Random::Secure.hex + consumer_tag = Random::Secure.hex + c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0}) + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 2) + + # get message without x-stream-offset, tracks offset + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 + sleep 0.1 + + # consume with x-stream-offset set, should consume the same message again + msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 + end + end + + it "should use saved offset if x-stream-offset & x-stream-automatic-offset-tracking is set" do + queue_name = Random::Secure.hex + consumer_tag = Random::Secure.hex + c_args = AMQP::Client::Arguments.new({"x-stream-offset": 0, "x-stream-automatic-offset-tracking": true}) + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 2) + + # get message without x-stream-offset, tracks offset + msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1 + sleep 0.1 + + # consume with x-stream-offset set, should consume the same message again + msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args) + StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 2 + end + end + + it "cleanup_consumer_offsets removes outdated offset" do + queue_name = Random::Secure.hex + offsets = [84_i64, -10_i64] + tag_prefix = "ctag-" + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + offsets.each_with_index do |offset, i| + msg_store.store_consumer_offset(tag_prefix + i.to_s, offset) + end + sleep 0.1 + msg_store.cleanup_consumer_offsets + msg_store.close + sleep 0.1 + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil + msg_store.last_offset_by_consumer_tag(tag_prefix + 0.to_s).should eq offsets[0] + msg_store.close + end + end + + it "runs cleanup when removing segment" do + consumer_tag = "ctag-1" + queue_name = Random::Secure.hex + args = {"x-queue-type": "stream", "x-max-length": 1} + msg_body = Bytes.new(LavinMQ::Config.instance.segment_size) + + with_amqp_server do |s| + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + + with_channel(s) do |ch| + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + q.publish_confirm msg_body + end + + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + msgs = Channel(AMQP::Client::DeliverMessage).new + q.subscribe(no_ack: false, tag: consumer_tag) do |msg| + msgs.send msg + msg.ack + end + msgs.receive + end + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq 2 + + with_channel(s) do |ch| + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + 2.times { q.publish_confirm msg_body } + end + + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(consumer_tag).should eq nil + end + end + + it "does not track offset if c-tag is auto-generated" do + queue_name = Random::Secure.hex + + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + args = {"x-queue-type": "stream"} + c_tag = "" + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue(queue_name, args: AMQP::Client::Arguments.new(args)) + msgs = Channel(AMQP::Client::DeliverMessage).new + c_tag = q.subscribe(no_ack: false) do |msg| + msgs.send msg + msg.ack + end + msgs.receive + end + + sleep 0.1 + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + msg_store.last_offset_by_consumer_tag(c_tag).should eq nil + end + end + + it "expands consumer offset file when needed" do + queue_name = Random::Secure.hex + consumer_tag_prefix = Random::Secure.hex(32) + with_amqp_server do |s| + StreamQueueSpecHelpers.publish(s, queue_name, 1) + data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name) + msg_store = LavinMQ::StreamQueue::StreamQueueMessageStore.new(data_dir, nil) + one_offset_bytesize = "#{consumer_tag_prefix}#{1000}".bytesize + 1 + 8 + offsets = (LavinMQ::Config.instance.segment_size / one_offset_bytesize).to_i32 + 1 + bytesize = 0 + offsets.times do |i| + consumer_tag = "#{consumer_tag_prefix}#{i + 1000}" + msg_store.store_consumer_offset(consumer_tag, i + 1000) + bytesize += consumer_tag.bytesize + 1 + 8 + end + msg_store.@consumer_offsets.size.should eq bytesize + msg_store.@consumer_offsets.size.should be > LavinMQ::Config.instance.segment_size + offsets.times do |i| + msg_store.last_offset_by_consumer_tag("#{consumer_tag_prefix}#{i + 1000}").should eq i + 1000 + end + end + end + end end diff --git a/src/lavinmq/client/channel/stream_consumer.cr b/src/lavinmq/client/channel/stream_consumer.cr index 33a7a04ad..2dfc4a5ed 100644 --- a/src/lavinmq/client/channel/stream_consumer.cr +++ b/src/lavinmq/client/channel/stream_consumer.cr @@ -9,11 +9,13 @@ module LavinMQ property segment : UInt32 property pos : UInt32 getter requeued = Deque(SegmentPosition).new + @track_offset = false def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) + @tag = frame.consumer_tag validate_preconditions(frame) offset = frame.arguments["x-stream-offset"]? - @offset, @segment, @pos = stream_queue.find_offset(offset) + @offset, @segment, @pos = stream_queue.find_offset(offset, @tag, @track_offset) super end @@ -34,7 +36,10 @@ module LavinMQ raise Error::PreconditionFailed.new("x-priority not supported on stream queues") end case frame.arguments["x-stream-offset"]? - when Nil, Int, Time, "first", "next", "last" + when Nil + @track_offset = true unless @tag.starts_with?("amq.ctag-") + when Int, Time, "first", "next", "last" + @track_offset = true if frame.arguments["x-stream-automatic-offset-tracking"]? else raise Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") end end @@ -82,6 +87,11 @@ module LavinMQ @queue.as(StreamQueue) end + def ack(sp) + stream_queue.store_consumer_offset(@tag, @offset) if @track_offset + super + end + def reject(sp, requeue : Bool) super if requeue diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index 394bd5c1e..b103d654d 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -229,9 +229,9 @@ class MFile < IO Bytes.new(buffer, @size, read_only: true) end - def to_slice(pos, size) + def to_slice(pos, size, read_only = true) raise IO::EOFError.new if pos + size > @size - Bytes.new(buffer + pos, size, read_only: true) + Bytes.new(buffer + pos, size, read_only: read_only) end def advise(advice : Advice, addr = buffer, offset = 0, length = @capacity) : Nil @@ -261,4 +261,9 @@ class MFile < IO raise IO::Error.from_errno("pread") if cnt == -1 cnt end + + def rename(new_path : String) : Nil + File.rename @path, new_path + @path = new_path + end end diff --git a/src/lavinmq/queue/stream_queue.cr b/src/lavinmq/queue/stream_queue.cr index 766087aa2..972441008 100644 --- a/src/lavinmq/queue/stream_queue.cr +++ b/src/lavinmq/queue/stream_queue.cr @@ -74,6 +74,10 @@ module LavinMQ end end + def store_consumer_offset(consumer_tag : String, offset : Int64) : Nil + stream_queue_msg_store.store_consumer_offset(consumer_tag, offset) + end + # yield the next message in the ready queue # returns true if a message was deliviered, false otherwise # if we encouncer an unrecoverable ReadError, close queue diff --git a/src/lavinmq/queue/stream_queue_message_store.cr b/src/lavinmq/queue/stream_queue_message_store.cr index 2754cc643..2f75f5c51 100644 --- a/src/lavinmq/queue/stream_queue_message_store.cr +++ b/src/lavinmq/queue/stream_queue_message_store.cr @@ -8,11 +8,16 @@ module LavinMQ property max_length_bytes : Int64? property max_age : Time::Span | Time::MonthSpan | Nil getter last_offset : Int64 - @segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age + @segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age + @consumer_offset_positions = Hash(String, Int64).new # used for consumer offsets + @consumer_offsets : MFile def initialize(@queue_data_dir : String, @replicator : Clustering::Replicator?) super @last_offset = get_last_offset + @consumer_offsets = MFile.new(File.join(@queue_data_dir, "consumer_offsets"), Config.instance.segment_size) + @replicator.try &.register_file @consumer_offsets + @consumer_offset_positions = restore_consumer_offset_positions drop_overflow end @@ -32,13 +37,22 @@ module LavinMQ # Used once when a consumer is started # Populates `segment` and `position` by iterating through segments # until `offset` is found - def find_offset(offset) : Tuple(Int64, UInt32, UInt32) + # ameba:disable Metrics/CyclomaticComplexity + def find_offset(offset, tag = nil, track_offset = false) : Tuple(Int64, UInt32, UInt32) raise ClosedError.new if @closed + if track_offset + consumer_last_offset = last_offset_by_consumer_tag(tag) + return find_offset_in_segments(consumer_last_offset) if consumer_last_offset + end + case offset - when "first", nil then offset_at(@segments.first_key, 4u32) - when "last" then offset_at(@segments.last_key, 4u32) - when "next" then last_offset_seg_pos - when Time then find_offset_in_segments(offset) + when "first" then offset_at(@segments.first_key, 4u32) + when "last" then offset_at(@segments.last_key, 4u32) + when "next" then last_offset_seg_pos + when Time then find_offset_in_segments(offset) + when nil + consumer_last_offset = last_offset_by_consumer_tag(tag) || 0 + find_offset_in_segments(consumer_last_offset) when Int if offset > @last_offset last_offset_seg_pos @@ -49,12 +63,15 @@ module LavinMQ end end - private def offset_at(seg, pos) : Tuple(Int64, UInt32, UInt32) + private def offset_at(seg, pos, retried = false) : Tuple(Int64, UInt32, UInt32) return {@last_offset, seg, pos} if @size.zero? mfile = @segments[seg] msg = BytesMessage.from_bytes(mfile.to_slice + pos) offset = offset_from_headers(msg.properties.headers) {offset, seg, pos} + rescue ex : IndexError # first segment can be empty if message size >= segment size + return offset_at(seg + 1, 4_u32, true) unless retried + raise ex end private def last_offset_seg_pos @@ -88,6 +105,71 @@ module LavinMQ {msg_offset, segment, pos} end + def last_offset_by_consumer_tag(consumer_tag : String) + if pos = @consumer_offset_positions[consumer_tag]? + tx = @consumer_offsets.to_slice(pos, 8) + return IO::ByteFormat::LittleEndian.decode(Int64, tx) + end + end + + private def restore_consumer_offset_positions : Hash(String, Int64) + positions = Hash(String, Int64).new + return positions if @consumer_offsets.size.zero? + + loop do + ctag = AMQ::Protocol::ShortString.from_io(@consumer_offsets) + break if ctag.empty? + positions[ctag] = @consumer_offsets.pos + @consumer_offsets.skip(8) + rescue IO::EOFError + break + end + @consumer_offsets.pos = 0 if @consumer_offsets.pos == 1 + @consumer_offsets.resize(@consumer_offsets.pos) + positions + end + + def store_consumer_offset(consumer_tag : String, new_offset : Int64) + cleanup_consumer_offsets if consumer_offset_file_full?(consumer_tag) + @consumer_offsets.write_bytes AMQ::Protocol::ShortString.new(consumer_tag) + @consumer_offset_positions[consumer_tag] = @consumer_offsets.size + @consumer_offsets.write_bytes new_offset + @replicator.try &.append(@consumer_offsets.path, (@consumer_offsets.size - consumer_tag.bytesize - 1 - 8).to_i32) + end + + def consumer_offset_file_full?(consumer_tag) + (@consumer_offsets.size + 1 + consumer_tag.bytesize + 8) >= @consumer_offsets.capacity + end + + def cleanup_consumer_offsets + return if @consumer_offsets.size.zero? + + offsets_to_save = Hash(String, Int64).new + lowest_offset_in_stream, _seg, _pos = offset_at(@segments.first_key, 4u32) + capacity = 0 + @consumer_offset_positions.each do |ctag, _pos| + if offset = last_offset_by_consumer_tag(ctag) + offsets_to_save[ctag] = offset if offset >= lowest_offset_in_stream + capacity += ctag.bytesize + 1 + 8 + end + end + @consumer_offset_positions = Hash(String, Int64).new + replace_offsets_file(capacity * 1000) do + offsets_to_save.each do |ctag, offset| + store_consumer_offset(ctag, offset) + end + end + end + + def replace_offsets_file(capacity : Int, &) + old_consumer_offsets = @consumer_offsets + @consumer_offsets = MFile.new("#{old_consumer_offsets.path}.tmp", capacity) + yield # fill the new file with correct data in this block + @consumer_offsets.rename(old_consumer_offsets.path) + old_consumer_offsets.close(truncate_to_size: false) + @replicator.try &.replace_file @consumer_offsets.path + end + def shift?(consumer : Client::Channel::StreamConsumer) : Envelope? raise ClosedError.new if @closed @@ -168,6 +250,7 @@ module LavinMQ Time.unix_ms(last_ts) < min_ts end end + cleanup_consumer_offsets end private def drop_segments_while(& : UInt32 -> Bool)