Skip to content

Commit

Permalink
Add more logging when loading definitions (#621)
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun authored Mar 6, 2024
1 parent a9baa37 commit 13024a9
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 48 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Keep the same vhost selected when creating queues [#629](https://github.com/cloudamqp/lavinmq/pull/629)
- Make sure name of Queues, Exchanges & Vhosts is not longer than 255 characters [#631](https://github.com/cloudamqp/lavinmq/pull/631)

### Added

- Improved logging during definitions loading [#621](https://github.com/cloudamqp/lavinmq/pull/621)

## [1.2.9] - 2024-02-05

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions src/lavinmq/error.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ module LavinMQ

class ExchangeTypeError < Error
end

class InvalidDefinitions < Error
end
end
end
112 changes: 64 additions & 48 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -524,60 +524,76 @@ module LavinMQ
if io.size.zero?
load_default_definitions
compact!
else
@definitions_lock.synchronize do
SchemaVersion.verify(io, :definition)
loop do
AMQP::Frame.from_io(io, IO::ByteFormat::SystemEndian) do |f|
case f
when AMQP::Frame::Exchange::Declare
exchanges[f.exchange_name] = f
when AMQP::Frame::Exchange::Delete
exchanges.delete f.exchange_name
exchange_bindings.delete f.exchange_name
should_compact = true
when AMQP::Frame::Exchange::Bind
exchange_bindings[f.destination] << f
when AMQP::Frame::Exchange::Unbind
exchange_bindings[f.destination].reject! do |b|
b.source == f.source &&
b.routing_key == f.routing_key &&
b.arguments == f.arguments
end
should_compact = true
when AMQP::Frame::Queue::Declare
queues[f.queue_name] = f
when AMQP::Frame::Queue::Delete
queues.delete f.queue_name
queue_bindings.delete f.queue_name
should_compact = true
when AMQP::Frame::Queue::Bind
queue_bindings[f.queue_name] << f
when AMQP::Frame::Queue::Unbind
queue_bindings[f.queue_name].reject! do |b|
b.exchange_name == f.exchange_name &&
b.routing_key == f.routing_key &&
b.arguments == f.arguments
end
should_compact = true
else raise "Cannot apply frame #{f.class} in vhost #{@name}"
return
end

@log.info { "Loading definitions" }
@definitions_lock.synchronize do
@log.debug { "Verifying schema" }
SchemaVersion.verify(io, :definition)
loop do
AMQP::Frame.from_io(io, IO::ByteFormat::SystemEndian) do |f|
@log.trace { "Reading frame #{f.inspect}" }
case f
when AMQP::Frame::Exchange::Declare
exchanges[f.exchange_name] = f
when AMQP::Frame::Exchange::Delete
exchanges.delete f.exchange_name
exchange_bindings.delete f.exchange_name
should_compact = true
when AMQP::Frame::Exchange::Bind
exchange_bindings[f.destination] << f
when AMQP::Frame::Exchange::Unbind
exchange_bindings[f.destination].reject! do |b|
b.source == f.source &&
b.routing_key == f.routing_key &&
b.arguments == f.arguments
end
should_compact = true
when AMQP::Frame::Queue::Declare
queues[f.queue_name] = f
when AMQP::Frame::Queue::Delete
queues.delete f.queue_name
queue_bindings.delete f.queue_name
should_compact = true
when AMQP::Frame::Queue::Bind
queue_bindings[f.queue_name] << f
when AMQP::Frame::Queue::Unbind
queue_bindings[f.queue_name].reject! do |b|
b.exchange_name == f.exchange_name &&
b.routing_key == f.routing_key &&
b.arguments == f.arguments
end
end # Frame.from_io
rescue ex : IO::EOFError
break
end # loop
end # synchronize
end # if
should_compact = true
else
raise "Cannot apply frame #{f.class} in vhost #{@name}"
end
end # Frame.from_io
rescue ex : IO::EOFError
break
end # loop
end # synchronize

# apply definitions
exchanges.each_value { |f| apply f, loading: true }
queues.each_value { |f| apply f, loading: true }
exchange_bindings.each_value { |fs| fs.each { |f| apply f, loading: true } }
queue_bindings.each_value { |fs| fs.each { |f| apply f, loading: true } }

@log.info { "Applying #{exchanges.size} exchanges" }
exchanges.each_value &->self.load_apply(AMQP::Frame)
@log.info { "Applying #{queues.size} queues" }
queues.each_value &->self.load_apply(AMQP::Frame)
@log.info { "Applying #{exchange_bindings.each_value.sum(0, &.size)} exchange bindings" }
exchange_bindings.each_value &.each(&->self.load_apply(AMQP::Frame))
@log.info { "Applying #{queue_bindings.each_value.sum(0, &.size)} queue bindings" }
queue_bindings.each_value &.each(&->self.load_apply(AMQP::Frame))

@log.info { "Definitions loaded" }
compact! if should_compact
end

protected def load_apply(frame : AMQP::Frame)
apply frame, loading: true
rescue ex : LavinMQ::Error
raise Error::InvalidDefinitions.new("Invalid frame: #{frame.inspect}")
end

private def load_default_definitions
@log.info { "Loading default definitions" }
@exchanges[""] = DefaultExchange.new(self, "", true, false, false)
Expand Down

0 comments on commit 13024a9

Please sign in to comment.