Skip to content

Commit

Permalink
rescue if lavinmq starts with faulty msg_store (#865)
Browse files Browse the repository at this point in the history
* rescue and close queue if LavinMQ starts with faulty msg_store

---------

Co-authored-by: Jon Börjesson <[email protected]>
  • Loading branch information
kickster97 and spuun authored Dec 9, 2024
1 parent 75e3151 commit e425466
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/lavinmq/amqp/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ module LavinMQ
@segment_msg_count = Hash(UInt32, UInt32).new(0u32)
@requeued = Deque(SegmentPosition).new
@closed = false
getter closed
getter bytesize = 0u64
getter size = 0u32
getter empty_change = Channel(Bool).new
Expand Down Expand Up @@ -352,6 +353,9 @@ module LavinMQ
@segments.delete seg
next
end
rescue ex
@log.error { "Could not initialize segment, closing message store: #{ex.message}" }
close
end
end
file.pos = 4
Expand Down Expand Up @@ -382,7 +386,8 @@ module LavinMQ
rescue ex : IO::EOFError
break
rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode
raise Error.new(mfile, cause: ex)
@log.error { "Could not initialize segment, closing message store: Failed to read segment #{seg} at pos #{mfile.pos}. #{ex}" }
close
end
mfile.pos = 4
mfile.unmap # will be mmap on demand
Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ module LavinMQ::AMQP
File.open(File.join(@data_dir, ".queue"), "w") { |f| f.sync = true; f.print @name }
@state = QueueState::Paused if File.exists?(File.join(@data_dir, ".paused"))
@msg_store = init_msg_store(@data_dir)
if @msg_store.closed
close
end
@empty_change = @msg_store.empty_change
handle_arguments
spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}" if @expires
Expand Down Expand Up @@ -802,6 +805,7 @@ module LavinMQ::AMQP
expire_msg(sp, :rejected)
end
rescue ex : MessageStore::Error
@log.error(ex) { "Queue closed due to error" }
close
raise ex
end
Expand Down

0 comments on commit e425466

Please sign in to comment.