From c88ba9336ad66bfbe186297bd155600cde05f160 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Thu, 5 Dec 2024 16:40:09 +0100 Subject: [PATCH 01/15] add filtering functionality for streams --- .../amqp/queue/stream_queue_message_store.cr | 20 +++++++++++++++++++ src/lavinmq/amqp/stream_consumer.cr | 7 +++++++ 2 files changed, 27 insertions(+) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 44a1adc8c7..174fe1c0a0 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -126,6 +126,18 @@ module LavinMQ::AMQP sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32) consumer.pos += sp.bytesize consumer.offset += 1 + if consumer_filter = consumer.filter # can be array of strings + matched = false + if (msg_filters = filter_values_from_headers(msg.properties.headers)) + # can msg_filters be array of strings? + consumer_filter.split(",").each do |filter| + matched = true if msg_filters == filter + end + #else + # matched = true if x-stream-match-unfiltered + end + return unless matched + end Envelope.new(sp, msg, redelivered: false) rescue ex raise Error.new(rfile, cause: ex) @@ -236,6 +248,14 @@ module LavinMQ::AMQP headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64) end + private def filter_values_from_headers(headers) : String? + if filters = headers.not_nil!("Message lacks headers")["x-stream-filter-value"]? + filters.to_s + else + nil + end + end + private def build_segment_indexes @segments.each do |seg_id, mfile| msg = BytesMessage.from_bytes(mfile.to_slice + 4u32) diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index 152ce98f2d..3ff28e6af2 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -9,6 +9,7 @@ module LavinMQ property segment : UInt32 property pos : UInt32 getter requeued = Deque(SegmentPosition).new + getter filter : String? def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) validate_preconditions(frame) @@ -37,6 +38,12 @@ module LavinMQ when Nil, Int, Time, "first", "next", "last" else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'") end + case frame.arguments["x-stream-filter-value"]? + when String + @filter = frame.arguments["x-stream-filter-value"].to_s + when Nil + else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string") + end end private def deliver_loop From 895fcbf2d4c199271ce4291bc30341b5f5180619 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 18 Dec 2024 16:05:38 +0100 Subject: [PATCH 02/15] handle x-stream-match-unfiltered --- src/lavinmq/amqp/stream_consumer.cr | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index 3ff28e6af2..beb59d2cc1 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -10,6 +10,7 @@ module LavinMQ property pos : UInt32 getter requeued = Deque(SegmentPosition).new getter filter : String? + getter match_unfiltered : Bool = false def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) validate_preconditions(frame) @@ -44,6 +45,12 @@ module LavinMQ when Nil else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string") end + case frame.arguments["x-stream-match-unfiltered"]? + when Bool + @match_unfiltered = frame.arguments["x-stream-match-unfiltered"].as(Bool) + when Nil + else raise LavinMQ::Error::PreconditionFailed.new("x-stream-match-unfiltered must be a boolean") + end end private def deliver_loop From 81ac8eb6953a7cfcf48952c2921a1de742afa8e6 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 18 Dec 2024 16:05:44 +0100 Subject: [PATCH 03/15] refactor --- .../amqp/queue/stream_queue_message_store.cr | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 174fe1c0a0..3ede29651d 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -126,17 +126,8 @@ module LavinMQ::AMQP sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32) consumer.pos += sp.bytesize consumer.offset += 1 - if consumer_filter = consumer.filter # can be array of strings - matched = false - if (msg_filters = filter_values_from_headers(msg.properties.headers)) - # can msg_filters be array of strings? - consumer_filter.split(",").each do |filter| - matched = true if msg_filters == filter - end - #else - # matched = true if x-stream-match-unfiltered - end - return unless matched + if consumer_filter = consumer.filter # can be a string or an array of comma-separated strings + return unless matching?(msg.properties.headers, consumer_filter, consumer.match_unfiltered) end Envelope.new(sp, msg, redelivered: false) rescue ex @@ -144,6 +135,17 @@ module LavinMQ::AMQP end end + private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool + if (msg_filters = filter_values_from_headers(msg_headers)) + consumer_filter.split(",").each do |filter| + return true if msg_filters == filter + end + else + return true if match_unfiltered + end + false + end + private def shift_requeued(requeued) : Envelope? while sp = requeued.shift? if segment = @segments[sp.segment]? # segment might have expired since requeued From 92929c2e32ad3e51e3110ce2cb6727a0350027af Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 18 Dec 2024 16:05:51 +0100 Subject: [PATCH 04/15] add specs --- spec/stream_queue_spec.cr | 105 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index a3f9618567..105322b19e 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -203,4 +203,109 @@ describe LavinMQ::AMQP::StreamQueue do end end end + + describe "Filters" do + it "should only get message matching filter" do + with_amqp_server do |s| + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue("stream_filter_1", args: stream_queue_args) + q.publish("msg without filter") + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"}) + q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs)) + q.publish("msg without filter") + + msgs = Channel(AMQP::Client::DeliverMessage).new + q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter-value": "foo"})) do |msg| + msgs.send msg + msg.ack + end + msg = msgs.receive + msg.body_io.to_s.should eq "msg with filter" + end + end + end + + it "should ignore messages with non-matching filters" do + with_amqp_server do |s| + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue("stream_filter_2", args: stream_queue_args) + q.publish("msg without filter") + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"}) + q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs)) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"}) + q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs)) + q.publish("msg without filter") + + msgs = Channel(AMQP::Client::DeliverMessage).new + q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter-value": "bar"})) do |msg| + msgs.send msg + msg.ack + end + msg = msgs.receive + msg.body_io.to_s.should eq "msg with filter: bar" + end + end + end + + it "should support multiple filters" do + with_amqp_server do |s| + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue("stream_filter_3", args: stream_queue_args) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"}) + q.publish("msg without filter") + q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs)) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "xyz"}) + q.publish("msg with filter: xyz", props: AMQP::Client::Properties.new(headers: hdrs)) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"}) + q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs)) + q.publish("msg without filter") + + msgs = Channel(AMQP::Client::DeliverMessage).new + filters = "foo,bar" + q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new( + {"x-stream-filter-value": filters} + )) do |msg| + msgs.send msg + msg.ack + end + msg = msgs.receive + msg.body_io.to_s.should eq "msg with filter: foo" + msg = msgs.receive + msg.body_io.to_s.should eq "msg with filter: bar" + end + end + end + + it "should get messages without filter when x-stream-match-unfiltered set" do + with_amqp_server do |s| + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue("stream_filter_4", args: stream_queue_args) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"}) + q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs)) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"}) + q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs)) + q.publish("msg without filter") + + msgs = Channel(AMQP::Client::DeliverMessage).new + q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new( + { + "x-stream-filter-value": "foo", + "x-stream-match-unfiltered": true, + } + )) do |msg| + msgs.send msg + msg.ack + end + msg = msgs.receive + msg.body_io.to_s.should eq "msg with filter: foo" + msg = msgs.receive + msg.body_io.to_s.should eq "msg without filter" + end + end + end + end end From d6c262154965cf6823658496899342e54207d0b9 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 18 Dec 2024 16:20:13 +0100 Subject: [PATCH 05/15] lint --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 6 +++--- src/lavinmq/amqp/stream_consumer.cr | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 3ede29651d..4819f9ff78 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -108,7 +108,7 @@ module LavinMQ::AMQP seg end - def shift?(consumer : AMQP::StreamConsumer) : Envelope? + def shift?(consumer : AMQP::StreamConsumer) : Envelope? # ameba:disable Metrics/CyclomaticComplexity raise ClosedError.new if @closed if env = shift_requeued(consumer.requeued) @@ -127,7 +127,7 @@ module LavinMQ::AMQP consumer.pos += sp.bytesize consumer.offset += 1 if consumer_filter = consumer.filter # can be a string or an array of comma-separated strings - return unless matching?(msg.properties.headers, consumer_filter, consumer.match_unfiltered) + return unless matching?(msg.properties.headers, consumer_filter, consumer.match_unfiltered?) end Envelope.new(sp, msg, redelivered: false) rescue ex @@ -136,7 +136,7 @@ module LavinMQ::AMQP end private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool - if (msg_filters = filter_values_from_headers(msg_headers)) + if msg_filters = filter_values_from_headers(msg_headers) consumer_filter.split(",").each do |filter| return true if msg_filters == filter end diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index beb59d2cc1..0058b743c3 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -10,7 +10,7 @@ module LavinMQ property pos : UInt32 getter requeued = Deque(SegmentPosition).new getter filter : String? - getter match_unfiltered : Bool = false + getter? match_unfiltered : Bool = false def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) validate_preconditions(frame) From d66eb10d99d1bd78e8d03915b1e6cc105763ce1a Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Tue, 7 Jan 2025 15:08:58 +0100 Subject: [PATCH 06/15] rename msg_filters->msg_filter --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 4819f9ff78..4f7c2800f4 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -136,12 +136,12 @@ module LavinMQ::AMQP end private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool - if msg_filters = filter_values_from_headers(msg_headers) + if msg_filter = filter_value_from_headers(msg_headers) consumer_filter.split(",").each do |filter| - return true if msg_filters == filter + return true if msg_filter == filter end else - return true if match_unfiltered + return match_unfiltered end false end @@ -250,9 +250,9 @@ module LavinMQ::AMQP headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64) end - private def filter_values_from_headers(headers) : String? - if filters = headers.not_nil!("Message lacks headers")["x-stream-filter-value"]? - filters.to_s + private def filter_value_from_headers(headers) : String? + if filter = headers.not_nil!("Message lacks headers")["x-stream-filter-value"]? + filter.to_s else nil end From 6c99d213164fe777a9613ad63b5a5d81e8ee4150 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Tue, 7 Jan 2025 15:17:15 +0100 Subject: [PATCH 07/15] add spec for offset and filters --- spec/stream_queue_spec.cr | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index 105322b19e..04a7376bd9 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -307,5 +307,28 @@ describe LavinMQ::AMQP::StreamQueue do end end end + + it "should respect offset values while filtering" do + with_amqp_server do |s| + with_channel(s) do |ch| + ch.prefetch 1 + q = ch.queue("stream_filter_5", args: stream_queue_args) + hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"}) + q.publish("msg with filter 1", props: AMQP::Client::Properties.new(headers: hdrs)) + q.publish("msg with filter 2", props: AMQP::Client::Properties.new(headers: hdrs)) + q.publish("msg without filter") + + msgs = Channel(AMQP::Client::DeliverMessage).new + args = AMQP::Client::Arguments.new({"x-stream-filter-value": "foo", "x-stream-offset": 2}) + q.subscribe(no_ack: false, args: args) do |msg| + msgs.send msg + msg.ack + end + msg = msgs.receive + msg.body_io.to_s.should eq "msg with filter 2" + end + end + end + end end From f2d7f8ed24664e76e81a2b554471c6058b7717e1 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Tue, 7 Jan 2025 15:18:20 +0100 Subject: [PATCH 08/15] fixup! add spec for offset and filters --- spec/stream_queue_spec.cr | 1 - 1 file changed, 1 deletion(-) diff --git a/spec/stream_queue_spec.cr b/spec/stream_queue_spec.cr index 04a7376bd9..2b6066e83d 100644 --- a/spec/stream_queue_spec.cr +++ b/spec/stream_queue_spec.cr @@ -329,6 +329,5 @@ describe LavinMQ::AMQP::StreamQueue do end end end - end end From 97153241a429031e048d89edf6e3adc941a791b2 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Thu, 9 Jan 2025 17:05:20 +0100 Subject: [PATCH 09/15] split string to array directly --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 2 +- src/lavinmq/amqp/stream_consumer.cr | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 4f7c2800f4..3d76e6e8b5 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -137,7 +137,7 @@ module LavinMQ::AMQP private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool if msg_filter = filter_value_from_headers(msg_headers) - consumer_filter.split(",").each do |filter| + consumer_filter.each do |filter| return true if msg_filter == filter end else diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index 0058b743c3..e17df1a706 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -9,7 +9,7 @@ module LavinMQ property segment : UInt32 property pos : UInt32 getter requeued = Deque(SegmentPosition).new - getter filter : String? + getter filter : Array(String)? = nil getter? match_unfiltered : Bool = false def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume) @@ -41,7 +41,7 @@ module LavinMQ end case frame.arguments["x-stream-filter-value"]? when String - @filter = frame.arguments["x-stream-filter-value"].to_s + @filter = frame.arguments["x-stream-filter-value"].to_s.split(",") when Nil else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string") end From 671fe1c57d6f9b2f33a4867186b84f822c541316 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Thu, 9 Jan 2025 17:05:54 +0100 Subject: [PATCH 10/15] refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 3d76e6e8b5..ad640f1100 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -251,7 +251,7 @@ module LavinMQ::AMQP end private def filter_value_from_headers(headers) : String? - if filter = headers.not_nil!("Message lacks headers")["x-stream-filter-value"]? + if filter = headers.try &.["x-stream-filter-value"]? filter.to_s else nil From 27ee311307bb03ccb6fc2dafa81307fd80bf636d Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 10 Jan 2025 09:56:38 +0100 Subject: [PATCH 11/15] refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index ad640f1100..8f7d9d3ade 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -137,9 +137,7 @@ module LavinMQ::AMQP private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool if msg_filter = filter_value_from_headers(msg_headers) - consumer_filter.each do |filter| - return true if msg_filter == filter - end + return consumer_filter.includes? msg_filter else return match_unfiltered end From 7b0fff32ff3ebbe8c5669d43f9da75de53325603 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 10 Jan 2025 09:57:13 +0100 Subject: [PATCH 12/15] remove comment MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 8f7d9d3ade..f78b625a5a 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -126,7 +126,7 @@ module LavinMQ::AMQP sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32) consumer.pos += sp.bytesize consumer.offset += 1 - if consumer_filter = consumer.filter # can be a string or an array of comma-separated strings + if consumer_filter = consumer.filter return unless matching?(msg.properties.headers, consumer_filter, consumer.match_unfiltered?) end Envelope.new(sp, msg, redelivered: false) From 08d41bc8a2d8a64fe82532161c0dfa027a1bd019 Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 10 Jan 2025 09:57:59 +0100 Subject: [PATCH 13/15] remove redundant return MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index f78b625a5a..51c3a1fce6 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -141,7 +141,6 @@ module LavinMQ::AMQP else return match_unfiltered end - false end private def shift_requeued(requeued) : Envelope? From 0e7e2cf0d82e62d1662bc15a5ccc22de9ac5224e Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Fri, 10 Jan 2025 09:59:58 +0100 Subject: [PATCH 14/15] add comments --- src/lavinmq/amqp/stream_consumer.cr | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lavinmq/amqp/stream_consumer.cr b/src/lavinmq/amqp/stream_consumer.cr index e17df1a706..1501e85d6c 100644 --- a/src/lavinmq/amqp/stream_consumer.cr +++ b/src/lavinmq/amqp/stream_consumer.cr @@ -43,12 +43,14 @@ module LavinMQ when String @filter = frame.arguments["x-stream-filter-value"].to_s.split(",") when Nil + # noop else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string") end case frame.arguments["x-stream-match-unfiltered"]? when Bool @match_unfiltered = frame.arguments["x-stream-match-unfiltered"].as(Bool) when Nil + # noop else raise LavinMQ::Error::PreconditionFailed.new("x-stream-match-unfiltered must be a boolean") end end From 150c2f786c3ac80e23129cdcab97b7aaa49ba95a Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Mon, 13 Jan 2025 10:10:42 +0100 Subject: [PATCH 15/15] format --- src/lavinmq/amqp/queue/stream_queue_message_store.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/stream_queue_message_store.cr b/src/lavinmq/amqp/queue/stream_queue_message_store.cr index 51c3a1fce6..1361d19272 100644 --- a/src/lavinmq/amqp/queue/stream_queue_message_store.cr +++ b/src/lavinmq/amqp/queue/stream_queue_message_store.cr @@ -137,7 +137,7 @@ module LavinMQ::AMQP private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool if msg_filter = filter_value_from_headers(msg_headers) - return consumer_filter.includes? msg_filter + return consumer_filter.includes? msg_filter else return match_unfiltered end