Skip to content

Commit

Permalink
Streams filters (#893)
Browse files Browse the repository at this point in the history
Adds support for filtering in streams.

Filters can be set on messages by providing x-stream-filter-value when publishing messages to a stream.
When a consumer tries to consume with x-stream-filter set, only messages matching the filter value will be returned. x-stream-filter on consumers can be a single value as a string, or a set of values separated by commas. Any message matching any of the filters on the consumer will be returned.


Co-authored-by: Carl Hörberg <[email protected]>
  • Loading branch information
viktorerlingsson and carlhoerberg authored Jan 23, 2025
1 parent 74d70a9 commit 09a75d3
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Support for message deduplication on exchanges and queues [854](https://github.com/cloudamqp/lavinmq/pull/854)
- Added filtering for streams [#893](https://github.com/cloudamqp/lavinmq/pull/893)

## [2.1.0] - 2025-01-16

Expand Down
127 changes: 127 additions & 0 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,131 @@ describe LavinMQ::AMQP::StreamQueue do
end
end
end

describe "Filters" do
it "should only get message matching filter" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_1", args: stream_queue_args)
q.publish("msg without filter")
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter": "foo"})) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter"
end
end
end

it "should ignore messages with non-matching filters" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_2", args: stream_queue_args)
q.publish("msg without filter")
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-filter": "bar"})) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: bar"
end
end
end

it "should support multiple filters" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_3", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg without filter")
q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "xyz"})
q.publish("msg with filter: xyz", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
filters = "foo,bar"
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new(
{"x-stream-filter": filters}
)) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: foo"
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: bar"
end
end
end

it "should get messages without filter when x-stream-match-unfiltered set" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_4", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter: foo", props: AMQP::Client::Properties.new(headers: hdrs))
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "bar"})
q.publish("msg with filter: bar", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new(
{
"x-stream-filter": "foo",
"x-stream-match-unfiltered": true,
}
)) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter: foo"
msg = msgs.receive
msg.body_io.to_s.should eq "msg without filter"
end
end
end

it "should respect offset values while filtering" do
with_amqp_server do |s|
with_channel(s) do |ch|
ch.prefetch 1
q = ch.queue("stream_filter_5", args: stream_queue_args)
hdrs = AMQP::Client::Arguments.new({"x-stream-filter-value" => "foo"})
q.publish("msg with filter 1", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg with filter 2", props: AMQP::Client::Properties.new(headers: hdrs))
q.publish("msg without filter")

msgs = Channel(AMQP::Client::DeliverMessage).new
args = AMQP::Client::Arguments.new({"x-stream-filter": "foo", "x-stream-offset": 2})
q.subscribe(no_ack: false, args: args) do |msg|
msgs.send msg
msg.ack
end
msg = msgs.receive
msg.body_io.to_s.should eq "msg with filter 2"
end
end
end
end
end
1 change: 1 addition & 0 deletions src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ module LavinMQ::AMQP
sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32)
consumer.pos += sp.bytesize
consumer.offset += 1
return unless consumer.filter_match?(msg.properties.headers)
Envelope.new(sp, msg, redelivered: false)
rescue ex
raise Error.new(rfile, cause: ex)
Expand Down
29 changes: 29 additions & 0 deletions src/lavinmq/amqp/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ module LavinMQ
property segment : UInt32
property pos : UInt32
getter requeued = Deque(SegmentPosition).new
@filter = Array(String).new
@match_unfiltered = false

def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume)
validate_preconditions(frame)
Expand Down Expand Up @@ -37,6 +39,20 @@ module LavinMQ
when Nil, Int, Time, "first", "next", "last"
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-offset must be an integer, a timestamp, 'first', 'next' or 'last'")
end
case filter = frame.arguments["x-stream-filter"]?
when String
@filter = filter.split(',').sort!
when Nil
# noop
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-filter-value must be a string")
end
case match_unfiltered = frame.arguments["x-stream-match-unfiltered"]?
when Bool
@match_unfiltered = match_unfiltered
when Nil
# noop
else raise LavinMQ::Error::PreconditionFailed.new("x-stream-match-unfiltered must be a boolean")
end
end

private def deliver_loop
Expand Down Expand Up @@ -89,6 +105,19 @@ module LavinMQ
@has_requeued.try_send? nil if @requeued.size == 1
end
end

def filter_match?(msg_headers) : Bool
return true if @filter.empty?
if filter_value = filter_value_from_msg_headers(msg_headers)
@filter.bsearch { |f| f >= filter_value } == filter_value
else
@match_unfiltered
end
end

private def filter_value_from_msg_headers(msg_headers) : String?
msg_headers.try &.fetch("x-stream-filter-value", nil).try &.to_s
end
end
end
end

0 comments on commit 09a75d3

Please sign in to comment.