Skip to content

Commit

Permalink
move filter_match to stream_consumer
Browse files Browse the repository at this point in the history
Co-authored-by: Carl Hörberg <[email protected]>
  • Loading branch information
viktorerlingsson and carlhoerberg committed Jan 20, 2025
1 parent d937d6b commit 6c496a7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
22 changes: 2 additions & 20 deletions src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ module LavinMQ::AMQP
seg
end

def shift?(consumer : AMQP::StreamConsumer) : Envelope? # ameba:disable Metrics/CyclomaticComplexity
def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed

if env = shift_requeued(consumer.requeued)
Expand All @@ -126,23 +126,13 @@ module LavinMQ::AMQP
sp = SegmentPosition.new(consumer.segment, consumer.pos, msg.bytesize.to_u32)
consumer.pos += sp.bytesize
consumer.offset += 1
if consumer_filter = consumer.filter
return unless matching?(msg.properties.headers, consumer_filter, consumer.match_unfiltered?)
end
return unless consumer.filter_match?(msg.properties.headers)
Envelope.new(sp, msg, redelivered: false)
rescue ex
raise Error.new(rfile, cause: ex)
end
end

private def matching?(msg_headers, consumer_filter, match_unfiltered) : Bool
if filter_value = filter_value_from_headers(msg_headers)
consumer_filter.bsearch { |f| f >= filter_value } == filter_value
else
match_unfiltered
end
end

private def shift_requeued(requeued) : Envelope?
while sp = requeued.shift?
if segment = @segments[sp.segment]? # segment might have expired since requeued
Expand Down Expand Up @@ -247,14 +237,6 @@ module LavinMQ::AMQP
headers.not_nil!("Message lacks headers")["x-stream-offset"].as(Int64)
end

private def filter_value_from_headers(headers) : String?
if filter = headers.try &.["x-stream-filter-value"]?
filter.to_s
else
nil
end
end

private def build_segment_indexes
@segments.each do |seg_id, mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
Expand Down
20 changes: 18 additions & 2 deletions src/lavinmq/amqp/stream_consumer.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ module LavinMQ
property segment : UInt32
property pos : UInt32
getter requeued = Deque(SegmentPosition).new
getter filter : Array(String)? = nil
getter? match_unfiltered : Bool = false
@filter : Array(String)? = nil
@match_unfiltered : Bool = false

def initialize(@channel : Client::Channel, @queue : StreamQueue, frame : AMQP::Frame::Basic::Consume)
validate_preconditions(frame)
Expand Down Expand Up @@ -105,6 +105,22 @@ module LavinMQ
@has_requeued.try_send? nil if @requeued.size == 1
end
end

def filter_match?(msg_headers) : Bool
if filter = @filter
if filter_value = filter_value_from_msg_headers(msg_headers)
filter.bsearch { |f| f >= filter_value } == filter_value
else
@match_unfiltered
end
else
true
end
end

private def filter_value_from_msg_headers(msg_headers) : String?
msg_headers.try &.fetch("x-stream-filter-value", nil).try &.to_s
end
end
end
end

0 comments on commit 6c496a7

Please sign in to comment.