Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams filters #893

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,109 @@ describe LavinMQ::AMQP::StreamQueue do
end
end
end

describe "Filters" do
it "should only get message matching filter" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_1", args: stream_queue_args)
q.publish("msg without filter")
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter-value": "foo"})) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter"
end
end
end

it "should ignore messages with non-matching filters" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_2", args: stream_queue_args)
q.publish("msg without filter")
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter-value": "bar"})) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: bar"
end
end
end

it "should support multiple filters" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_3", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg without filter")
q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "xyz"})
q.publish("msg with filter: xyz", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
filters = "foo,bar"
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new(
{"x-stream-filter-value": filters}
)) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: foo"
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: bar"
end
end
end

it "should get messages without filter when x-stream-match-unfiltered set" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_4", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new(
{
"x-stream-filter-value": "foo",
"x-stream-match-unfiltered": true,
}
)) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: foo"
msg = msgs.receive
msg.body_io.to_s.should eq "msg without filter"
end
end
end
end
end
24 changes: 23 additions & 1 deletion src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ module LavinMQ::AMQP
seg
end

def shift?(consumer : AMQP::StreamConsumer) : Envelope?
def shift?(consumer : AMQP::StreamConsumer) : Envelope? # ameba:disable Metrics/CyclomaticComplexity
raise ClosedError.new if @closed

if env = shift_requeued(consumer.requeued)
Expand All @@ -126,12 +126,26 @@ module LavinMQ::AMQP
sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32)
consumer.pos += sp.bytesize
consumer.offset += 1
if consumer_filter = consumer.filter # can be a string or an array of comma-separated strings
return unless matching?(msg.properties.headers, consumer_filter, consumer.match_unfiltered?)
end
Envelope.new(sp, msg, redelivered: false)
rescue ex
raise Error.new(rfile, cause: ex)
end
end

private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool
if msg_filters = filter_values_from_headers(msg_headers)
consumer_filter.split(",").each do |filter|
return true if msg_filters == filter
end
else
return true if match_unfiltered
end
false
end

private def shift_requeued(requeued) : Envelope?
while sp = requeued.shift?
if segment = @segments[sp.segment]? # segment might have expired since requeued
Expand Down Expand Up @@ -236,6 +250,14 @@ module LavinMQ::AMQP
headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64)
end

private def filter_values_from_headers(headers) : String?
if filters = headers.not_nil!("Message lacks headers")["x-stream-filter-value"]?
filters.to_s
else
nil
end
end

private def build_segment_indexes
@segments.each do |seg_id, mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
Expand Down
14 changes: 14 additions & 0 deletions src/lavinmq/amqp/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module LavinMQ
property segment : UInt32
property pos : UInt32
getter requeued = Deque(SegmentPosition).new
getter filter : String?
getter? match_unfiltered : Bool = false

def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume)
validate_preconditions(frame)
Expand Down Expand Up @@ -37,6 +39,18 @@ module LavinMQ
when Nil, Int, Time, "first", "next", "last"
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")
end
case frame.arguments["x-stream-filter-value"]?
when String
@filter = frame.arguments["x-stream-filter-value"].to_s
when Nil
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string")
end
case frame.arguments["x-stream-match-unfiltered"]?
when Bool
@match_unfiltered = frame.arguments["x-stream-match-unfiltered"].as(Bool)
when Nil
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-match-unfiltered must be a boolean")
end
end

private def deliver_loop
Expand Down
Loading