Skip to content

Commit

Permalink
don't let Launcher know about clustering/leases
Browse files Browse the repository at this point in the history
Let it be a concern for Clustering Controller

No need to poll the data dir lock, because it's only required for NFS
disks.
  • Loading branch information
carlhoerberg committed Dec 27, 2024
1 parent 7cadbb7 commit a336fa3
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 29 deletions.
12 changes: 4 additions & 8 deletions src/lavinmq.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ config.parse # both ARGV and config file
require "./lavinmq/launcher"
require "./lavinmq/clustering/controller"

begin
if config.clustering?
LavinMQ::Clustering::Controller.new(config).run
else
LavinMQ::Launcher.new(config).run
end
rescue LavinMQ::Launcher::LostLeadership
exit 3
if config.clustering?
LavinMQ::Clustering::Controller.new(config).run
else
LavinMQ::Launcher.new(config).run
end
18 changes: 15 additions & 3 deletions src/lavinmq/clustering/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,27 @@ class LavinMQ::Clustering::Controller
def run
spawn(follow_leader, name: "Follower monitor")
wait_to_be_insync
lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
@lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
# TODO: make sure we still are in the ISR set
replicator = Clustering::Server.new(@config, @etcd)
@launcher = l = Launcher.new(@config, replicator, lease)
l.run
@launcher = Launcher.new(@config, replicator).start
loop do
if lease.wait(30.seconds)
break if @stopped
Log.fatal { "Lost cluster leadership" }
exit 3
else
GC.collect
end
end
end

@stopped = false

def stop
@stopped = true
@launcher.try &.stop
@lease.try &.release
end

# Each node in a cluster has an unique id, for tracking ISR
Expand Down
27 changes: 9 additions & 18 deletions src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require "./server"
require "./http/http_server"
require "./in_memory_backend"
require "./data_dir_lock"
require "./etcd"

module LavinMQ
class Launcher
Expand All @@ -15,9 +14,8 @@ module LavinMQ
@first_shutdown_attempt = true
@data_dir_lock : DataDirLock?
@closed = false
@leadership : Etcd::Leadership?

def initialize(@config : Config, replicator = Clustering::NoopServer.new, @leadership = nil)
def initialize(@config : Config, replicator = Clustering::NoopServer.new)
print_environment_info
print_max_map_count
fd_limit = System.maximize_fd_limit
Expand All @@ -38,23 +36,17 @@ module LavinMQ
setup_log_exchange
end

def run
def start : self
listen
SystemD.notify_ready
self
end

def run
start
loop do
if leadership = @leadership
if leadership.wait(30.seconds)
Log.fatal { "Lost cluster leadership" }
raise LostLeadership.new
else
@data_dir_lock.try &.poll
GC.collect
end
else
sleep 30.seconds
@data_dir_lock.try &.poll
GC.collect
end
sleep 30.seconds
GC.collect
end
end

Expand All @@ -66,7 +58,6 @@ module LavinMQ
@http_server.close rescue nil
@amqp_server.close rescue nil
@data_dir_lock.try &.release
@leadership.try &.release
end

private def print_environment_info
Expand Down

0 comments on commit a336fa3

Please sign in to comment.