From ddaa57b4a33596fc8504e3c1b62dd7814949d8a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 13 Dec 2024 12:42:06 +0100 Subject: [PATCH 1/8] More reliable clustering Catch and explicity reraise IO::Errors in etcd, otherwise when an Etcd method yielded, and that inner call raised IO::Error that was interpreted as a Etcd error. Extra logging related to Following Start etcd lease keepalive after won election Apprently there's no need to update lease TTL until the election is won Refactor Leadership lease keepalive dont log Lost leadership if manually revoked etcd error are sometimes json, sometimes not don't let Launcher know about clustering/leases Let it be a concern for Clustering Controller No need to poll the data dir lock, because it's only required for NFS disks. --- spec/etcd_spec.cr | 1 + src/lavinmq/clustering/client.cr | 1 + src/lavinmq/clustering/controller.cr | 22 ++++- src/lavinmq/data_dir_lock.cr | 4 +- src/lavinmq/etcd.cr | 137 ++++++++++++++++----------- src/lavinmq/launcher.cr | 27 ++---- 6 files changed, 113 insertions(+), 79 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index a2e73d9359..e0dad4d477 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -1,6 +1,7 @@ require "spec" require "../src/lavinmq/etcd" require "file_utils" +require "http/client" describe LavinMQ::Etcd do it "can put and get" do diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index 26c97ece42..6e5e88c092 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -57,6 +57,7 @@ module LavinMQ end def follow(host : String, port : Int32) + Log.info { "Following #{host}:#{port}" } @host = host @port = port if amqp_proxy = @amqp_proxy diff --git a/src/lavinmq/clustering/controller.cr b/src/lavinmq/clustering/controller.cr index 19f81b7c4d..d39fd1cd88 100644 --- a/src/lavinmq/clustering/controller.cr +++ b/src/lavinmq/clustering/controller.cr @@ -17,14 +17,27 @@ class LavinMQ::Clustering::Controller def run spawn(follow_leader, name: "Follower monitor") wait_to_be_insync - lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + @lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + # TODO: make sure we still are in the ISR set replicator = Clustering::Server.new(@config, @etcd) - @launcher = l = Launcher.new(@config, replicator, lease) - l.run + @launcher = Launcher.new(@config, replicator).start + loop do + if lease.wait(30.seconds) + break if @stopped + Log.fatal { "Lost cluster leadership" } + exit 3 + else + GC.collect + end + end end + @stopped = false + def stop + @stopped = true @launcher.try &.stop + @lease.try &.release end # Each node in a cluster has an unique id, for tracking ISR @@ -66,6 +79,9 @@ class LavinMQ::Clustering::Controller spawn r.follow(uri), name: "Clustering client #{uri}" SystemD.notify_ready end + rescue ex + Log.fatal(exception: ex) { "Unhandled exception while following leader" } + exit 36 # 36 for CF (Cluster Follower) end def wait_to_be_insync diff --git a/src/lavinmq/data_dir_lock.cr b/src/lavinmq/data_dir_lock.cr index 87d752a4f6..2654b0499f 100644 --- a/src/lavinmq/data_dir_lock.cr +++ b/src/lavinmq/data_dir_lock.cr @@ -21,6 +21,7 @@ module LavinMQ @lock.flock_exclusive(blocking: true) Log.info { "Lock acquired" } end + Log.debug { "Data directory lock aquired" } @lock.truncate @lock.print "PID #{Process.pid} @ #{System.hostname}" @lock.fsync @@ -37,7 +38,8 @@ module LavinMQ def poll @lock.read_at(0, 1, &.read_byte) || raise IO::EOFError.new rescue ex : IO::Error | ArgumentError - abort "ERROR: Lost data directory lock! #{ex.inspect}" + Log.fatal(exception: ex) { "Lost data dir lock" } + exit 4 # 4 for D(dataDir) end end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index fab7a3728b..e9acad2135 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -1,4 +1,4 @@ -require "http/client" +require "socket" require "wait_group" require "json" require "./logger" @@ -35,7 +35,7 @@ module LavinMQ def watch(key, &) body = %({"create_request":{"key":"#{Base64.strict_encode key}"}}) - post_stream("/v3/watch", body) do |json| + stream("/v3/watch", body) do |json| next if json.dig?("result", "created") == true # "watch created" is first event if value = json.dig?("result", "events", 0, "kv", "value") @@ -61,10 +61,17 @@ module LavinMQ if ttl = json.dig?("result", "TTL") ttl.as_s.to_i else - raise Error.new("Lost lease #{id}") + raise Error.new("Lease #{id} expired") end end + def lease_ttl(id) : Int32 + json = post("/v3/lease/timetolive", body: %({"ID":"#{id}"})) + ttl = json["TTL"].as_s.to_i + raise Error.new("Lease #{id} expired") if ttl < 0 + ttl + end + def lease_revoke(id) : Nil post("/v3/lease/revoke", body: %({"ID":"#{id}"})) end @@ -81,56 +88,51 @@ module LavinMQ # Returns when elected leader # Returns a `Leadership` instance def elect(name, value, ttl = 10) : Leadership - channel = Channel(Nil).new - lease_id, ttl = lease_grant(ttl) - wg = WaitGroup.new(1) - spawn(name: "Etcd lease keepalive #{lease_id}") do - wg.done - loop do - select - when channel.receive? - lease_revoke(lease_id) - channel.close - break - when timeout((ttl * 0.7).seconds) - ttl = lease_keepalive(lease_id) - end - rescue ex - Log.warn { "Lost leadership of #{name}: #{ex}" } - channel.close - break - end - end + lease_id, _ttl = lease_grant(ttl) election_campaign(name, value, lease_id) - wg.wait - Leadership.new(self, lease_id, channel) + Leadership.new(self, lease_id) end - # Represents a holding a Leadership + # Represents holding a Leadership # Can be revoked or wait until lost class Leadership - def initialize(@etcd : Etcd, @lease_id : Int64, @lost_leadership_channel : Channel(Nil)) + def initialize(@etcd : Etcd, @lease_id : Int64) + @lost_leadership = Channel(Nil).new + spawn(keepalive_loop, name: "Etcd lease keepalive #{@lease_id}") end # Force release leadership def release @etcd.lease_revoke(@lease_id) + @lost_leadership.close end # Wait until looses leadership # Returns true when lost leadership, false when timeout occured def wait(timeout : Time::Span) : Bool select - when @lost_leadership_channel.receive? - return true + when @lost_leadership.receive? + true when timeout(timeout) - return false + false end end + + private def keepalive_loop + ttl = @etcd.lease_ttl(@lease_id) + loop do + sleep (ttl * 0.7).seconds + ttl = @etcd.lease_keepalive(@lease_id) + end + rescue ex + Log.error(exception: ex) { "Lost leadership" } unless @lost_leadership.closed? + ensure + @lost_leadership.close + end end def elect_listen(name, &) - post_stream("/v3/election/observe", %({"name":"#{Base64.strict_encode name}"})) do |json| + stream("/v3/election/observe", %({"name":"#{Base64.strict_encode name}"})) do |json| if value = json.dig?("result", "kv", "value") yield Base64.decode_string(value.as_s) else @@ -153,31 +155,37 @@ module LavinMQ chunks = read_chunks(tcp) parse_json! chunks else - body = tcp.read_string(content_length) + body = read_string(tcp, content_length) parse_json! body end end - private def post_stream(path, body, & : JSON::Any -> _) + private def stream(path, body, & : JSON::Any -> _) with_tcp do |tcp, address| - send_request(tcp, address, path, body) - content_length = read_headers(tcp) - if content_length == -1 # Chunked response - read_chunks(tcp) do |chunk| - yield parse_json! chunk - end - else - body = tcp.read_string(content_length) - yield parse_json! body + post_stream(tcp, address, path, body) do |chunk| + yield chunk end end end + private def post_stream(tcp, address, path, body, & : JSON::Any -> _) + send_request(tcp, address, path, body) + content_length = read_headers(tcp) + if content_length == -1 # Chunked response + read_chunks(tcp) do |chunk| + yield parse_json! chunk + end + else + body = read_string(tcp, content_length) + yield parse_json! body + end + end + private def read_chunks(tcp, & : String -> _) : Nil response_finished = false loop do - bytesize = tcp.read_line.to_i(16) - chunk = tcp.read_string(bytesize) + bytesize = read_chunk_size(tcp) + chunk = read_string(tcp, bytesize) tcp.skip(2) # each chunk ends with \r\n break if bytesize.zero? yield chunk @@ -196,6 +204,20 @@ module LavinMQ break if bytesize.zero? end end + rescue ex : IO::Error + raise Error.new("Read chunked response error", cause: ex) + end + + def read_string(tcp, content_length) : String + tcp.read_string(content_length) + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) + end + + def read_chunk_size(tcp) : Int32 + tcp.read_line.to_i(16) + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) end private def send_request(tcp : IO, address : String, path : String, body : String) @@ -205,6 +227,8 @@ module LavinMQ tcp << "\r\n" tcp << body tcp.flush + rescue ex : IO::Error + raise Error.new("Send request error", cause: ex) end # Parse response headers, return Content-Length (-1 implies chunked response) @@ -233,6 +257,8 @@ module LavinMQ end end content_length + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) end @connections = Deque(Tuple(TCPSocket, String)).new @@ -248,10 +274,6 @@ module LavinMQ Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" } socket.close rescue nil sleep 0.1.seconds - rescue IO::Error - Log.warn { "Lost connection to #{address}, retrying" } - socket.close rescue nil - sleep 0.1.seconds ensure @connections.push({socket, address}) unless socket.closed? end @@ -313,18 +335,19 @@ module LavinMQ private def raise_if_error(json) if error = json["error"]? Log.debug { "etcd error: #{error}" } - if errorh = error.as_h? - error_msg = errorh["message"].as_s - case error_msg - when "error reading from server: EOF" - raise IO::EOFError.new(error_msg) - when "etcdserver: no leader" - raise NoLeader.new(error_msg) + error_msg = + if errorh = error.as_h? + errorh["message"].as_s else - raise Error.new error_msg + error.as_s end + case error_msg + when "error reading from server: EOF" + raise IO::EOFError.new(error_msg) + when "etcdserver: no leader" + raise NoLeader.new(error_msg) else - raise Error.new error.as_s + raise Error.new error_msg end end end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index ec1ada68e3..64c6b101f9 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -6,7 +6,6 @@ require "./server" require "./http/http_server" require "./in_memory_backend" require "./data_dir_lock" -require "./etcd" module LavinMQ class Launcher @@ -15,9 +14,8 @@ module LavinMQ @first_shutdown_attempt = true @data_dir_lock : DataDirLock? @closed = false - @leadership : Etcd::Leadership? - def initialize(@config : Config, replicator = Clustering::NoopServer.new, @leadership = nil) + def initialize(@config : Config, replicator = Clustering::NoopServer.new) print_environment_info print_max_map_count fd_limit = System.maximize_fd_limit @@ -38,23 +36,17 @@ module LavinMQ setup_log_exchange end - def run + def start : self listen SystemD.notify_ready + self + end + + def run + start loop do - if leadership = @leadership - if leadership.wait(30.seconds) - Log.fatal { "Lost cluster leadership" } - exit 3 # 3rd character in the alphabet is C(lustering) - else - @data_dir_lock.try &.poll - GC.collect - end - else - sleep 30.seconds - @data_dir_lock.try &.poll - GC.collect - end + sleep 30.seconds + GC.collect end end @@ -66,7 +58,6 @@ module LavinMQ @http_server.close rescue nil @amqp_server.close rescue nil @data_dir_lock.try &.release - @leadership.try &.release end private def print_environment_info From 16ec616e60547dcdddebe5bde1d3ec99684e6a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 13 Dec 2024 21:18:20 +0100 Subject: [PATCH 2/8] always have a read timeout on Follower sockets we want to timeout when waiting for acks, if the follower is unresponsive --- src/lavinmq/clustering/follower.cr | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index 7e72ea6722..1b47a4568d 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -26,7 +26,6 @@ module LavinMQ validate_header! authenticate!(password) @id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian - @socket.read_timeout = nil @socket.tcp_nodelay = true @socket.read_buffering = false @socket.sync = true # Use buffering in lz4 From 0ce7cf3489c108428291eb82f097a1f5426dbd12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 25 Dec 2024 00:49:18 +0100 Subject: [PATCH 3/8] make it possible to run a local etcd while running specs use custom ports for the specs --- spec/clustering_spec.cr | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index d1859ee6bb..480cb8dd09 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -13,9 +13,12 @@ describe LavinMQ::Clustering::Client do "--log-level=error", "--unsafe-no-fsync=true", "--force-new-cluster=true", + "--listen-peer-urls=http://127.0.0.1:12380", + "--listen-client-urls=http://127.0.0.1:12379", + "--advertise-client-urls=http://127.0.0.1:12379", }, output: STDOUT, error: STDERR) - client = HTTP::Client.new("127.0.0.1", 2379) + client = HTTP::Client.new("127.0.0.1", 12379) i = 0 loop do sleep 0.02.seconds @@ -26,7 +29,7 @@ describe LavinMQ::Clustering::Client do end rescue e : Socket::ConnectError i += 1 - raise "Cant connect to etcd on port 2379. Giving up after 100 tries. (#{e.message})" if i >= 100 + raise "Cant connect to etcd on port 12379. Giving up after 100 tries. (#{e.message})" if i >= 100 next end client.close @@ -40,7 +43,7 @@ describe LavinMQ::Clustering::Client do end it "can stream changes" do - replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) + replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir @@ -73,7 +76,7 @@ describe LavinMQ::Clustering::Client do end it "can stream full file" do - replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) + replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir @@ -102,6 +105,7 @@ describe LavinMQ::Clustering::Client do it "will failover" do config1 = LavinMQ::Config.new config1.data_dir = "/tmp/failover1" + config1.clustering_etcd_endpoints = "localhost:12379" config1.clustering_advertised_uri = "tcp://localhost:5681" config1.clustering_port = 5681 config1.amqp_port = 5671 @@ -110,6 +114,7 @@ describe LavinMQ::Clustering::Client do config2 = LavinMQ::Config.new config2.data_dir = "/tmp/failover2" + config2.clustering_etcd_endpoints = "localhost:12379" config2.clustering_advertised_uri = "tcp://localhost:5682" config2.clustering_port = 5682 config2.amqp_port = 5672 @@ -118,7 +123,7 @@ describe LavinMQ::Clustering::Client do listen = Channel(String).new spawn(name: "etcd elect leader spec") do - etcd = LavinMQ::Etcd.new + etcd = LavinMQ::Etcd.new("localhost:12379") etcd.elect_listen("lavinmq/leader") do |value| listen.send value end From ef92ed148e915c91f31d8d010a963f40e85139ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 27 Dec 2024 02:11:49 +0100 Subject: [PATCH 4/8] don't fsync in DataDirLock can't see the need --- src/lavinmq/data_dir_lock.cr | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lavinmq/data_dir_lock.cr b/src/lavinmq/data_dir_lock.cr index 2654b0499f..cbd63db05a 100644 --- a/src/lavinmq/data_dir_lock.cr +++ b/src/lavinmq/data_dir_lock.cr @@ -24,12 +24,10 @@ module LavinMQ Log.debug { "Data directory lock aquired" } @lock.truncate @lock.print "PID #{Process.pid} @ #{System.hostname}" - @lock.fsync end def release @lock.truncate - @lock.fsync @lock.flock_unlock end From f93c2ac31c90760b3f88458d0e5fe0a511a212e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 27 Dec 2024 21:52:45 +0100 Subject: [PATCH 5/8] make federation upstream spec more stable in slow CI --- spec/upstream_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 35a1b50cb2..9f7e53efa8 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -392,7 +392,7 @@ describe LavinMQ::Federation::Upstream do select when ch.receive? - when timeout 100.milliseconds + when timeout 3.seconds fail "federation didn't resume? timeout waiting for message on downstream queue" end From 33f5d4864fd13098e5aa05ce22679f9b4ed27d01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 29 Dec 2024 23:05:07 +0100 Subject: [PATCH 6/8] Pass Config to Server, not just data_dir Config.instance was used heavily in Server --- spec/clustering_spec.cr | 4 ++-- spec/spec_helper.cr | 2 +- spec/storage_spec.cr | 3 ++- spec/vhost_spec.cr | 6 +++--- src/lavinmq/launcher.cr | 2 +- src/lavinmq/server.cr | 33 +++++++++++++++++---------------- 6 files changed, 26 insertions(+), 24 deletions(-) diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index 480cb8dd09..35380dc4fa 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -63,7 +63,7 @@ describe LavinMQ::Clustering::Client do done.receive end - server = LavinMQ::Server.new(follower_data_dir) + server = LavinMQ::Server.new(config) begin q = server.vhosts["/"].queues["repli"].as(LavinMQ::AMQP::DurableQueue) q.message_count.should eq 1 @@ -94,7 +94,7 @@ describe LavinMQ::Clustering::Client do done.receive end - server = LavinMQ::Server.new(follower_data_dir) + server = LavinMQ::Server.new(config) begin server.users["u1"].should_not be_nil ensure diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index f4bf0652e1..f89124c64d 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -74,7 +74,7 @@ end def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.new, & : LavinMQ::Server -> Nil) tcp_server = TCPServer.new("localhost", 0) - s = LavinMQ::Server.new(LavinMQ::Config.instance.data_dir, replicator) + s = LavinMQ::Server.new(LavinMQ::Config.instance, replicator) begin if tls ctx = OpenSSL::SSL::Context::Server.new diff --git a/spec/storage_spec.cr b/spec/storage_spec.cr index fd0ed74e87..89946d9a28 100644 --- a/spec/storage_spec.cr +++ b/spec/storage_spec.cr @@ -11,7 +11,8 @@ describe LavinMQ::AMQP::DurableQueue do end it "should succefully convert queue index" do - server = LavinMQ::Server.new("/tmp/lavinmq-spec-index-v2") + config = LavinMQ::Config.new.tap &.data_dir = "/tmp/lavinmq-spec-index-v2" + server = LavinMQ::Server.new(config) begin q = server.vhosts["/"].queues["queue"].as(LavinMQ::AMQP::DurableQueue) q.basic_get(true) do |env| diff --git a/spec/vhost_spec.cr b/spec/vhost_spec.cr index 81272856e8..03bca2d000 100644 --- a/spec/vhost_spec.cr +++ b/spec/vhost_spec.cr @@ -35,7 +35,7 @@ describe LavinMQ::VHost do end it "should be able to persist durable delayed exchanges when type = x-delayed-message" do - data_dir = "" + config = LavinMQ::Config.new with_amqp_server do |s| # This spec is to verify a fix where a server couldn't start again after a crash if # an delayed exchange had been declared by specifiying the type as "x-delayed-message". @@ -46,11 +46,11 @@ describe LavinMQ::VHost do # Start a new server with the same data dir as `Server` without stopping # `Server` first, because stopping would compact definitions and therefore "rewrite" - data_dir = s.data_dir + config.data_dir = s.data_dir end # the definitions file. This is to simulate a start after a "crash". # If this succeeds we assume it worked...? - LavinMQ::Server.new(data_dir) + LavinMQ::Server.new(config) end it "should be able to persist durable queues" do diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 64c6b101f9..2fb2ea4bd0 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -28,7 +28,7 @@ module LavinMQ if @config.data_dir_lock? @data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire end - @amqp_server = LavinMQ::Server.new(@config.data_dir, replicator) + @amqp_server = LavinMQ::Server.new(@config, replicator) @http_server = LavinMQ::HTTP::Server.new(@amqp_server) @tls_context = create_tls_context if @config.tls_configured? reload_tls_context diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 69b729079d..f381fd0577 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -30,7 +30,8 @@ module LavinMQ @replicator : Clustering::Replicator Log = LavinMQ::Log.for "server" - def initialize(@data_dir : String, @replicator = Clustering::NoopServer.new) + def initialize(@config : Config, @replicator = Clustering::NoopServer.new) + @data_dir = @config.data_dir Dir.mkdir_p @data_dir Schema.migrate(@data_dir, @replicator) @users = UserStore.new(@data_dir, @replicator) @@ -99,17 +100,17 @@ module LavinMQ private def extract_conn_info(client) : ConnectionInfo remote_address = client.remote_address - case Config.instance.tcp_proxy_protocol + case @config.tcp_proxy_protocol when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else # Allow proxy connection from followers - if Config.instance.clustering? && + if @config.clustering? && client.peek[0, 5]? == "PROXY".to_slice && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower ProxyProtocol::V1.parse(client) - elsif Config.instance.clustering? && + elsif @config.clustering? && client.peek[0, 8]? == ProxyProtocol::V2::Signature.to_slice[0, 8] && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower @@ -130,7 +131,7 @@ module LavinMQ remote_address = client.remote_address set_buffer_size(client) conn_info = - case Config.instance.unix_proxy_protocol + case @config.unix_proxy_protocol when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else ConnectionInfo.local # TODO: use unix socket address, don't fake local @@ -252,21 +253,21 @@ module LavinMQ private def set_socket_options(socket) unless socket.remote_address.loopback? - if keepalive = Config.instance.tcp_keepalive + if keepalive = @config.tcp_keepalive socket.keepalive = true socket.tcp_keepalive_idle = keepalive[0] socket.tcp_keepalive_interval = keepalive[1] socket.tcp_keepalive_count = keepalive[2] end end - socket.tcp_nodelay = true if Config.instance.tcp_nodelay? - Config.instance.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } - Config.instance.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } + socket.tcp_nodelay = true if @config.tcp_nodelay? + @config.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } + @config.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } end private def set_buffer_size(socket) - if Config.instance.socket_buffer_size.positive? - socket.buffer_size = Config.instance.socket_buffer_size + if @config.socket_buffer_size.positive? + socket.buffer_size = @config.socket_buffer_size socket.sync = false socket.read_buffering = true else @@ -288,8 +289,8 @@ module LavinMQ end def update_system_metrics(statm) - interval = Config.instance.stats_interval.milliseconds.to_i - log_size = Config.instance.stats_log_size + interval = @config.stats_interval.milliseconds.to_i + log_size = @config.stats_log_size rusage = System.resource_usage {% for m in METRICS %} @@ -353,7 +354,7 @@ module LavinMQ end control_flow! - sleep Config.instance.stats_interval.milliseconds + sleep @config.stats_interval.milliseconds end ensure statm.try &.close @@ -435,11 +436,11 @@ module LavinMQ end def disk_full? - @disk_free < 3_i64 * Config.instance.segment_size || @disk_free < Config.instance.free_disk_min + @disk_free < 3_i64 * @config.segment_size || @disk_free < @config.free_disk_min end def disk_usage_over_warning_level? - @disk_free < 6_i64 * Config.instance.segment_size || @disk_free < Config.instance.free_disk_warn + @disk_free < 6_i64 * @config.segment_size || @disk_free < @config.free_disk_warn end def flow(active : Bool) From 0a7ca56b68f6fd1f5b7d600e1282e06542ea4b36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 29 Dec 2024 23:06:52 +0100 Subject: [PATCH 7/8] Move responsibility of Clustering to Launcher If the Launcher receives an Etcd, Launcher creates, and later closes, the ClusteringServer instance. --- spec/clustering_spec.cr | 2 ++ src/lavinmq/clustering/controller.cr | 3 +-- src/lavinmq/launcher.cr | 5 ++++- src/lavinmq/server.cr | 1 - 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index 35380dc4fa..9aff334e84 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -73,6 +73,8 @@ describe LavinMQ::Clustering::Client do ensure server.close end + ensure + replicator.try &.close end it "can stream full file" do diff --git a/src/lavinmq/clustering/controller.cr b/src/lavinmq/clustering/controller.cr index d39fd1cd88..1988d33c5c 100644 --- a/src/lavinmq/clustering/controller.cr +++ b/src/lavinmq/clustering/controller.cr @@ -19,8 +19,7 @@ class LavinMQ::Clustering::Controller wait_to_be_insync @lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader # TODO: make sure we still are in the ISR set - replicator = Clustering::Server.new(@config, @etcd) - @launcher = Launcher.new(@config, replicator).start + @launcher = Launcher.new(@config, @etcd).start loop do if lease.wait(30.seconds) break if @stopped diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 2fb2ea4bd0..51e4671524 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -6,6 +6,7 @@ require "./server" require "./http/http_server" require "./in_memory_backend" require "./data_dir_lock" +require "./etcd" module LavinMQ class Launcher @@ -15,7 +16,7 @@ module LavinMQ @data_dir_lock : DataDirLock? @closed = false - def initialize(@config : Config, replicator = Clustering::NoopServer.new) + def initialize(@config : Config, etcd : Etcd? = nil) print_environment_info print_max_map_count fd_limit = System.maximize_fd_limit @@ -28,6 +29,7 @@ module LavinMQ if @config.data_dir_lock? @data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire end + @replicator = replicator = etcd ? Clustering::Server.new(@config, etcd) : Clustering::NoopServer.new @amqp_server = LavinMQ::Server.new(@config, replicator) @http_server = LavinMQ::HTTP::Server.new(@amqp_server) @tls_context = create_tls_context if @config.tls_configured? @@ -58,6 +60,7 @@ module LavinMQ @http_server.close rescue nil @amqp_server.close rescue nil @data_dir_lock.try &.release + @replicator.close end private def print_environment_info diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index f381fd0577..4d9e39cfb4 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -206,7 +206,6 @@ module LavinMQ @listeners.each_key &.close Log.debug { "Closing vhosts" } @vhosts.close - @replicator.close end def add_parameter(parameter : Parameter) From 02969a3753e84ec28fc76804281d352362133670 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 29 Dec 2024 23:10:29 +0100 Subject: [PATCH 8/8] In Actions, get filename via instance variable No need to use the getter --- src/lavinmq/clustering/actions.cr | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lavinmq/clustering/actions.cr b/src/lavinmq/clustering/actions.cr index 04277fbe2c..2062573343 100644 --- a/src/lavinmq/clustering/actions.cr +++ b/src/lavinmq/clustering/actions.cr @@ -29,11 +29,11 @@ module LavinMQ def lag_size : Int64 if mfile = @mfile - 0i64 + sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + @filename.bytesize + sizeof(Int64) + mfile.size.to_i64 else - 0i64 + sizeof(Int32) + filename.bytesize + - sizeof(Int64) + File.size(File.join(@data_dir, filename)).to_i64 + 0i64 + sizeof(Int32) + @filename.bytesize + + sizeof(Int64) + File.size(File.join(@data_dir, @filename)).to_i64 end end @@ -69,7 +69,7 @@ module LavinMQ in UInt32, Int32 4i64 end - 0i64 + sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + @filename.bytesize + sizeof(Int64) + datasize end @@ -100,7 +100,7 @@ module LavinMQ # Maybe it would be ok to not include delete action in lag, because # the follower should have all info necessary to GC the file during # startup? - (sizeof(Int32) + filename.bytesize + sizeof(Int64)).to_i64 + (sizeof(Int32) + @filename.bytesize + sizeof(Int64)).to_i64 end def send(socket, log = Log) : Int64