From 0e2291b9a0ae1f29e06b75f04dcc53a027078717 Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 22 Jan 2025 17:52:48 +0100 Subject: [PATCH 1/2] add initialize to amqp connection_factory --- src/lavinmq/amqp/connection_factory.cr | 21 ++++++++++++--------- src/lavinmq/server.cr | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/lavinmq/amqp/connection_factory.cr b/src/lavinmq/amqp/connection_factory.cr index a9774da4ae..f5bff9ead5 100644 --- a/src/lavinmq/amqp/connection_factory.cr +++ b/src/lavinmq/amqp/connection_factory.cr @@ -8,16 +8,19 @@ module LavinMQ class ConnectionFactory < LavinMQ::ConnectionFactory Log = LavinMQ::Log.for "amqp.connection_factory" - def start(socket, connection_info, vhosts, users) : Client? + def initialize(@users : UserStore, @vhosts : VHostStore) + end + + def start(socket, connection_info) : Client? remote_address = connection_info.src socket.read_timeout = 15.seconds metadata = ::Log::Metadata.build({address: remote_address.to_s}) logger = Logger.new(Log, metadata) if confirm_header(socket, logger) if start_ok = start(socket, logger) - if user = authenticate(socket, remote_address, users, start_ok, logger) + if user = authenticate(socket, remote_address, start_ok, logger) if tune_ok = tune(socket, logger) - if vhost = open(socket, vhosts, user, logger) + if vhost = open(socket, user, logger) socket.read_timeout = heartbeat_timeout(tune_ok) return LavinMQ::AMQP::Client.new(socket, connection_info, vhost, user, tune_ok, start_ok) end @@ -39,7 +42,7 @@ module LavinMQ end end - def confirm_header(socket, log) : Bool + def confirm_header(socket, log : Logger) : Bool proto = uninitialized UInt8[8] count = socket.read(proto.to_slice) if count.zero? # EOF, socket closed by peer @@ -71,7 +74,7 @@ module LavinMQ }, }) - def start(socket, log) + def start(socket, log : Logger) start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES) socket.write_bytes start, ::IO::ByteFormat::NetworkEndian socket.flush @@ -100,9 +103,9 @@ module LavinMQ end end - def authenticate(socket, remote_address, users, start_ok, log) + def authenticate(socket, remote_address, start_ok, log) username, password = credentials(start_ok) - user = users[username]? + user = @users[username]? return user if user && user.password && user.password.not_nil!.verify(password) && guest_only_loopback?(remote_address, user) @@ -150,10 +153,10 @@ module LavinMQ tune_ok end - def open(socket, vhosts, user, log) + def open(socket, user, log) open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) } vhost_name = open.vhost.empty? ? "/" : open.vhost - if vhost = vhosts[vhost_name]? + if vhost = @vhosts[vhost_name]? if user.permissions[vhost_name]? if vhost.max_connections.try { |max| vhost.connections.size >= max } log.warn { "Max connections (#{vhost.max_connections}) reached for vhost #{vhost_name}" } diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 4d9e39cfb4..4d9379180b 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -37,7 +37,7 @@ module LavinMQ @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) @parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator) - @amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new + @amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new(@users, @vhosts) apply_parameter spawn stats_loop, name: "Server#stats_loop" end @@ -245,7 +245,7 @@ module LavinMQ end def handle_connection(socket, connection_info) - client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users) + client = @amqp_connection_factory.start(socket, connection_info) ensure socket.close if client.nil? end From 2da02b9a5186f2e8a889e44e570556de7721a697 Mon Sep 17 00:00:00 2001 From: Christina Date: Thu, 23 Jan 2025 13:21:57 +0100 Subject: [PATCH 2/2] for specs, re-init connection factory for server restart --- src/lavinmq/server.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 4d9379180b..6a7dcf1d90 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -66,6 +66,7 @@ module LavinMQ @users = UserStore.new(@data_dir, @replicator) @vhosts = VHostStore.new(@data_dir, @users, @replicator) @parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator) + @amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new(@users, @vhosts) apply_parameter @closed = false Fiber.yield