Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ref counting MFiles #887

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions spec/schema_version_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,30 @@ describe LavinMQ::SchemaVersion do
it "Empty file should raise IO::EOFError" do
with_datadir do |data_dir|
path = File.join(data_dir, "test_schema_version")
file = MFile.new(path, 12)
expect_raises(IO::EOFError) do
LavinMQ::SchemaVersion.verify(file, :message)
MFile.open(path, 12) do |file|
expect_raises(IO::EOFError) do
LavinMQ::SchemaVersion.verify(file, :message)
end
end
end
end

it "Should verify schema version" do
with_datadir do |data_dir|
path = File.join(data_dir, "test_schema_version")
file = MFile.new(path, 12)
file.write_bytes LavinMQ::Schema::VERSION
LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message]
MFile.open(path, 12) do |file|
file.write_bytes LavinMQ::Schema::VERSION
LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message]
end
end
end

it "Deletes empty file and creates a new when it is the first file" do
with_datadir do |data_dir|
path = File.join(data_dir, "msgs.0000000001")
file = MFile.new(path, LavinMQ::Config.instance.segment_size)
file.resize(LavinMQ::Config.instance.segment_size)
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil)
[email protected]_value.size.should eq 4
Expand All @@ -39,8 +42,9 @@ describe LavinMQ::SchemaVersion do
v.declare_queue("q", true, false)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@queue_data_dir
path = File.join(data_dir, "msgs.0000000002")
file = MFile.new(path, LavinMQ::Config.instance.segment_size)
file.resize(LavinMQ::Config.instance.segment_size)
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil)
[email protected] eq 1
Expand Down
80 changes: 45 additions & 35 deletions src/lavinmq/amqp/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ module LavinMQ
load_stats_from_segments
delete_unused_segments
@wfile_id = @segments.last_key
@wfile = @segments.last_value
@wfile = @segments.last_value.borrow
@rfile_id = @segments.first_key
@rfile = @segments.first_value
@rfile = @segments.first_value.borrow
end

def push(msg) : SegmentPosition
Expand Down Expand Up @@ -67,11 +67,14 @@ module LavinMQ
raise ClosedError.new if @closed
if sp = @requeued.first?
seg = @segments[sp.segment]
seg.borrow
begin
msg = BytesMessage.from_bytes(seg.to_slice + sp.position)
return Envelope.new(sp, msg, redelivered: true)
rescue ex
raise Error.new(seg, cause: ex)
ensure
seg.unborrow
end
end

Expand Down Expand Up @@ -105,6 +108,7 @@ module LavinMQ
raise ClosedError.new if @closed
if sp = @requeued.shift?
segment = @segments[sp.segment]
segment.borrow
begin
msg = BytesMessage.from_bytes(segment.to_slice + sp.position)
@bytesize -= sp.bytesize
Expand All @@ -113,6 +117,8 @@ module LavinMQ
return Envelope.new(sp, msg, redelivered: true)
rescue ex
raise Error.new(segment, cause: ex)
ensure
segment.unborrow
end
end

Expand Down Expand Up @@ -148,11 +154,13 @@ module LavinMQ

def [](sp : SegmentPosition) : BytesMessage
raise ClosedError.new if @closed
segment = @segments[sp.segment]
segment = @segments[sp.segment].borrow
begin
BytesMessage.from_bytes(segment.to_slice + sp.position)
rescue ex
raise Error.new(segment, cause: ex)
ensure
segment.unborrow
end
end

Expand All @@ -171,11 +179,11 @@ module LavinMQ
@log.debug { "Deleting segment #{sp.segment}" }
select_next_read_segment if sp.segment == @rfile_id
if a = @acks.delete(sp.segment)
a.delete(raise_on_missing: false).close
a.unborrow.delete(raise_on_missing: false).close
@replicator.try &.delete_file(a.path)
end
if seg = @segments.delete(sp.segment)
seg.delete(raise_on_missing: false).close
seg.unborrow.delete(raise_on_missing: false).close
@replicator.try &.delete_file(seg.path)
end
@segment_msg_count.delete(sp.segment)
Expand Down Expand Up @@ -228,19 +236,12 @@ module LavinMQ
(@bytesize / @size).to_u32
end

def unmap_segments(except : Enumerable(UInt32) = StaticArray(UInt32, 0).new(0u32))
@segments.each do |seg_id, mfile|
next if mfile == @wfile
next if except.includes? seg_id
mfile.unmap
end
end

private def select_next_read_segment : MFile?
# Expect @segments to be ordered
if id = @segments.each_key.find { |sid| sid > @rfile_id }
@rfile.unborrow
@rfile_id = id
@rfile = @segments[id]
@rfile = @segments[id].borrow
end
end

Expand All @@ -258,25 +259,26 @@ module LavinMQ
end

private def open_new_segment(next_msg_size = 0) : MFile
@wfile.unmap unless @wfile == @rfile
@wfile.unborrow
next_id = @wfile_id + 1
path = File.join(@queue_data_dir, "msgs.#{next_id.to_s.rjust(10, '0')}")
capacity = Math.max(Config.instance.segment_size, next_msg_size + 4)
delete_unused_segments
wfile = MFile.new(path, capacity)
wfile.borrow
wfile.write_bytes Schema::VERSION
wfile.pos = 4
@replicator.try &.register_file wfile
@replicator.try &.append path, Schema::VERSION
@wfile_id = next_id
@wfile = @segments[next_id] = wfile
wfile
end

private def open_ack_file(id) : MFile
path = File.join(@queue_data_dir, "acks.#{id.to_s.rjust(10, '0')}")
capacity = Config.instance.segment_size // BytesMessage::MIN_BYTESIZE * 4 + 4
mfile = MFile.new(path, capacity, writeonly: true)
mfile.borrow
@replicator.try &.register_file mfile
mfile
end
Expand Down Expand Up @@ -335,8 +337,10 @@ module LavinMQ
end
@replicator.try &.register_file file
if was_empty
file.write_bytes Schema::VERSION
@replicator.try &.append path, Schema::VERSION
file.borrow do
file.write_bytes Schema::VERSION
@replicator.try &.append path, Schema::VERSION
end
else
begin
SchemaVersion.verify(file, :message)
Expand All @@ -347,8 +351,10 @@ module LavinMQ
@replicator.try &.delete_file(path)
if idx == 0 # Recreate the file if it's the first segment because we need at least one segment to exist
file = MFile.new(path, Config.instance.segment_size)
file.write_bytes Schema::VERSION
@replicator.try &.append path, Schema::VERSION
file.borrow do
file.write_bytes Schema::VERSION
@replicator.try &.append path, Schema::VERSION
end
else
@segments.delete seg
next
Expand All @@ -375,22 +381,26 @@ module LavinMQ
end
@segments.each do |seg, mfile|
count = 0u32
loop do
pos = mfile.pos
ts = IO::ByteFormat::SystemEndian.decode(Int64, mfile.to_slice(pos, 8))
break mfile.resize(pos) if ts.zero? # This means that the rest of the file is zero, so resize it
bytesize = BytesMessage.skip(mfile)
count += 1
next if deleted?(seg, pos)
update_stats_per_msg(seg, ts, bytesize)
rescue ex : IO::EOFError
break
rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode
@log.error { "Could not initialize segment, closing message store: Failed to read segment #{seg} at pos #{mfile.pos}. #{ex}" }
close
mfile.borrow
begin
loop do
pos = mfile.pos
ts = IO::ByteFormat::SystemEndian.decode(Int64, mfile.to_slice(pos, 8))
break mfile.resize(pos) if ts.zero? # This means that the rest of the file is zero, so resize it
bytesize = BytesMessage.skip(mfile)
count += 1
next if deleted?(seg, pos)
update_stats_per_msg(seg, ts, bytesize)
rescue ex : IO::EOFError
break
rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode
@log.error { "Could not initialize segment, closing message store: Failed to read segment #{seg} at pos #{mfile.pos}. #{ex}" }
close
end
ensure
mfile.unborrow
end
mfile.pos = 4
mfile.unmap # will be mmap on demand
if is_long_queue
@log.info { "Loaded #{counter}/#{@segments.size} segments, #{@size} messages" } if (counter &+= 1) % 128 == 0
else
Expand All @@ -417,7 +427,7 @@ module LavinMQ
@segment_msg_count.delete seg
@deleted.delete seg
if ack = @acks.delete(seg)
ack.delete(raise_on_missing: false).close
ack.unborrow.delete(raise_on_missing: false).close
@replicator.try &.delete_file(ack.path)
end
mfile.delete(raise_on_missing: false).close
Expand Down
3 changes: 0 additions & 3 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,6 @@ module LavinMQ::AMQP
delete
else
notify_consumers_empty(true)
@msg_store_lock.synchronize do
@msg_store.unmap_segments
end
end
end
end
Expand Down
17 changes: 4 additions & 13 deletions src/lavinmq/amqp/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -152,23 +152,14 @@ module LavinMQ::AMQP
end

private def unmap_and_remove_segments_loop
sleep rand(60).seconds
until closed?
sleep 60.seconds
unmap_and_remove_segments
end
end

private def unmap_and_remove_segments
used_segments = Set(UInt32).new
@consumers_lock.synchronize do
@consumers.each do |consumer|
used_segments << consumer.as(AMQP::StreamConsumer).segment
break if closed?
@msg_store_lock.synchronize do
stream_queue_msg_store.drop_overflow
end
end
@msg_store_lock.synchronize do
stream_queue_msg_store.drop_overflow
stream_queue_msg_store.unmap_segments(except: used_segments)
end
end
end
end
37 changes: 23 additions & 14 deletions src/lavinmq/amqp/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ module LavinMQ::AMQP

private def offset_at(seg, pos) : Tuple(Int64, UInt32, UInt32)
return {@last_offset, seg, pos} if @size.zero?
mfile = @segments[seg]
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
offset = offset_from_headers(msg.properties.headers)
{offset, seg, pos}
@segments[seg].borrow do |mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + pos)
offset = offset_from_headers(msg.properties.headers)
{offset, seg, pos}
end
end

private def last_offset_seg_pos
Expand All @@ -78,13 +79,18 @@ module LavinMQ::AMQP
return last_offset_seg_pos
end
end
msg = BytesMessage.from_bytes(rfile.to_slice + pos)
msg_offset = offset_from_headers(msg.properties.headers)
case offset
in Int then break if offset <= msg_offset
in Time then break if offset <= Time.unix_ms(msg.timestamp)
rfile.borrow
begin
msg = BytesMessage.from_bytes(rfile.to_slice + pos)
msg_offset = offset_from_headers(msg.properties.headers)
case offset
in Int then break if offset <= msg_offset
in Time then break if offset <= Time.unix_ms(msg.timestamp)
end
pos += msg.bytesize
ensure
rfile.unborrow
end
pos += msg.bytesize
rescue ex
raise rfile ? Error.new(rfile, cause: ex) : ex
end
Expand Down Expand Up @@ -146,10 +152,11 @@ module LavinMQ::AMQP
end

private def next_segment(consumer) : MFile?
@segments[consumer.segment].unborrow
if seg_id = @segments.each_key.find { |sid| sid > consumer.segment }
consumer.segment = seg_id
consumer.pos = 4u32
@segments[seg_id]
@segments[seg_id].borrow
end
end

Expand Down Expand Up @@ -238,9 +245,11 @@ module LavinMQ::AMQP

private def build_segment_indexes
@segments.each do |seg_id, mfile|
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@offset_index[seg_id] = offset_from_headers(msg.properties.headers)
@timestamp_index[seg_id] = msg.timestamp
mfile.borrow do
msg = BytesMessage.from_bytes(mfile.to_slice + 4u32)
@offset_index[seg_id] = offset_from_headers(msg.properties.headers)
@timestamp_index[seg_id] = msg.timestamp
end
rescue IndexError
@offset_index[seg_id] = @last_offset
@timestamp_index[seg_id] = RoughTime.unix_ms
Expand Down
Loading
Loading