Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Apr 17, 2024
1 parent a852893 commit e489d65
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 12 deletions.
7 changes: 3 additions & 4 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
require "./spec_helper"
require "./../src/lavinmq/queue"
#require "./../src/lavinmq/queue/stream_queue_message_store"

module StreamQueueSpecHelpers
def self.publish(queue_name, nr_of_messages)
Expand All @@ -11,7 +10,7 @@ module StreamQueueSpecHelpers
end
end

def self.consume_one(queue_name, c_tag, c_args = AMQP::Client::Arguments.new())
def self.consume_one(queue_name, c_tag, c_args = AMQP::Client::Arguments.new)
args = {"x-queue-type": "stream"}
with_channel do |ch|
ch.prefetch 1
Expand Down Expand Up @@ -210,14 +209,14 @@ describe LavinMQ::StreamQueue do
consumer_tag = Random::Secure.hex
offset = 3

StreamQueueSpecHelpers.publish(queue_name, offset+1)
StreamQueueSpecHelpers.publish(queue_name, offset + 1)

offset.times { StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag) }
sleep 0.1

# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(queue_name, consumer_tag)
msg.properties.headers.not_nil!["x-stream-offset"].as(Int64).should eq offset+1
msg.properties.headers.not_nil!["x-stream-offset"].as(Int64).should eq offset + 1
end

it "reads offsets from file on init" do
Expand Down
1 change: 0 additions & 1 deletion src/lavinmq/mfile.cr
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ class MFile < IO
slice.copy_to(buffer + pos, slice.size)
end


def read(slice : Bytes)
raise ClosedError.new if @closed
pos = @pos
Expand Down
14 changes: 7 additions & 7 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module LavinMQ
property max_age : Time::Span | Time::MonthSpan | Nil
getter last_offset : Int64
@segment_last_ts = Hash(UInt32, Int64).new(0i64) # used for max-age
@consumer_offsets_hash : Hash(String, Int64) # used for consumer offsets
@consumer_offsets_hash : Hash(String, Int64) # used for consumer offsets
@consumer_offsets : MFile
@consumer_offset_path : String

Expand Down Expand Up @@ -43,9 +43,9 @@ module LavinMQ
raise ClosedError.new if @closed
case offset
when "first" then offset_at(@segments.first_key, 4u32)
when "last" then offset_at(@segments.last_key, 4u32)
when "next" then last_offset_seg_pos
when Time then find_offset_in_segments(offset)
when "last" then offset_at(@segments.last_key, 4u32)
when "next" then last_offset_seg_pos
when Time then find_offset_in_segments(offset)
when nil
consumer_last_offset = last_offset_by_consumer_tag(tag) || 0
find_offset_in_segments(consumer_last_offset)
Expand Down Expand Up @@ -118,12 +118,12 @@ module LavinMQ
ctag_start = 0
while more_to_read && slice.size > 0
if slice[i] == 32
ctag = String.new(slice[ctag_start..i-1])
pos = i+1
ctag = String.new(slice[ctag_start..i - 1])
pos = i + 1
hash[ctag] = pos
ctag_start = pos + 8
end
more_to_read = false if (i += 1) == slice.size-1
more_to_read = false if (i += 1) == slice.size - 1
end
hash
end
Expand Down

0 comments on commit e489d65

Please sign in to comment.