diff --git a/src/lavinmq/error.cr b/src/lavinmq/error.cr index 9b55cb7fb3..b26711f3f9 100644 --- a/src/lavinmq/error.cr +++ b/src/lavinmq/error.cr @@ -15,5 +15,8 @@ module LavinMQ class ExchangeTypeError < Error end + + class InvalidDefinitions < Error + end end end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index ae7227fd32..ea2cbd81fe 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -524,60 +524,76 @@ module LavinMQ if io.size.zero? load_default_definitions compact! - else - @definitions_lock.synchronize do - SchemaVersion.verify(io, :definition) - loop do - AMQP::Frame.from_io(io, IO::ByteFormat::SystemEndian) do |f| - case f - when AMQP::Frame::Exchange::Declare - exchanges[f.exchange_name] = f - when AMQP::Frame::Exchange::Delete - exchanges.delete f.exchange_name - exchange_bindings.delete f.exchange_name - should_compact = true - when AMQP::Frame::Exchange::Bind - exchange_bindings[f.destination] << f - when AMQP::Frame::Exchange::Unbind - exchange_bindings[f.destination].reject! do |b| - b.source == f.source && - b.routing_key == f.routing_key && - b.arguments == f.arguments - end - should_compact = true - when AMQP::Frame::Queue::Declare - queues[f.queue_name] = f - when AMQP::Frame::Queue::Delete - queues.delete f.queue_name - queue_bindings.delete f.queue_name - should_compact = true - when AMQP::Frame::Queue::Bind - queue_bindings[f.queue_name] << f - when AMQP::Frame::Queue::Unbind - queue_bindings[f.queue_name].reject! do |b| - b.exchange_name == f.exchange_name && - b.routing_key == f.routing_key && - b.arguments == f.arguments - end - should_compact = true - else raise "Cannot apply frame #{f.class} in vhost #{@name}" + return + end + + @log.info { "Loading definitions" } + @definitions_lock.synchronize do + @log.debug { "Verifying schema" } + SchemaVersion.verify(io, :definition) + loop do + AMQP::Frame.from_io(io, IO::ByteFormat::SystemEndian) do |f| + @log.trace { "Reading frame #{f.inspect}" } + case f + when AMQP::Frame::Exchange::Declare + exchanges[f.exchange_name] = f + when AMQP::Frame::Exchange::Delete + exchanges.delete f.exchange_name + exchange_bindings.delete f.exchange_name + should_compact = true + when AMQP::Frame::Exchange::Bind + exchange_bindings[f.destination] << f + when AMQP::Frame::Exchange::Unbind + exchange_bindings[f.destination].reject! do |b| + b.source == f.source && + b.routing_key == f.routing_key && + b.arguments == f.arguments + end + should_compact = true + when AMQP::Frame::Queue::Declare + queues[f.queue_name] = f + when AMQP::Frame::Queue::Delete + queues.delete f.queue_name + queue_bindings.delete f.queue_name + should_compact = true + when AMQP::Frame::Queue::Bind + queue_bindings[f.queue_name] << f + when AMQP::Frame::Queue::Unbind + queue_bindings[f.queue_name].reject! do |b| + b.exchange_name == f.exchange_name && + b.routing_key == f.routing_key && + b.arguments == f.arguments end - end # Frame.from_io - rescue ex : IO::EOFError - break - end # loop - end # synchronize - end # if + should_compact = true + else + raise "Cannot apply frame #{f.class} in vhost #{@name}" + end + end # Frame.from_io + rescue ex : IO::EOFError + break + end # loop + end # synchronize # apply definitions - exchanges.each_value { |f| apply f, loading: true } - queues.each_value { |f| apply f, loading: true } - exchange_bindings.each_value { |fs| fs.each { |f| apply f, loading: true } } - queue_bindings.each_value { |fs| fs.each { |f| apply f, loading: true } } - + @log.info { "Applying #{exchanges.size} exchanges" } + exchanges.each_value &->self.load_apply(AMQP::Frame) + @log.info { "Applying #{queues.size} queues" } + queues.each_value &->self.load_apply(AMQP::Frame) + @log.info { "Applying #{exchange_bindings.each_value.sum(0, &.size)} exchange bindings" } + exchange_bindings.each_value &.each(&->self.load_apply(AMQP::Frame)) + @log.info { "Applying #{queue_bindings.each_value.sum(0, &.size)} queue bindings" } + queue_bindings.each_value &.each(&->self.load_apply(AMQP::Frame)) + + @log.info { "Definitions loaded" } compact! if should_compact end + protected def load_apply(frame : AMQP::Frame) + apply frame, loading: true + rescue ex : LavinMQ::Error + raise Error::InvalidDefinitions.new("Invalid frame: #{frame.inspect}") + end + private def load_default_definitions @log.info { "Loading default definitions" } @exchanges[""] = DefaultExchange.new(self, "", true, false, false)