Skip to content

Commit

Permalink
Check to be insync after elected leader
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun committed Dec 10, 2024
1 parent 2180d5f commit c281b0d
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions src/lavinmq/clustering/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ class LavinMQ::Clustering::Controller

def run
spawn(follow_leader, name: "Follower monitor")
wait_to_be_insync
lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
lease = loop do
wait_to_be_insync
Log.info { "Campaigning for leader..." }
lease2 = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
break lease2 if in_sync_to_be_leader?
lease2.close
Log.info { "Not in sync to be leader" }
end
replicator = Clustering::Server.new(@config, @etcd)
@launcher = l = Launcher.new(@config, replicator, lease)
l.run
Expand Down Expand Up @@ -45,20 +51,20 @@ class LavinMQ::Clustering::Controller
# Listens for leader change events
private def follow_leader
repli_client = nil
Log.info { "Listening for leader changes..." }
Log.info { "follow_leader Listening for leader changes..." }
@etcd.elect_listen("#{@config.clustering_etcd_prefix}/leader") do |uri|
Log.debug { "Leader event: #{uri}" }
Log.debug { "follow_leader Leader event: #{uri}" }
next if repli_client.try &.follows?(uri) # if lost connection to etcd we continue follow the leader as is
repli_client.try &.close
if uri == @advertised_uri # if this instance has become leader
Log.debug { "Is leader, don't replicate from self" }
Log.debug { "follow_leader Is leader, don't replicate from self" }
next
end
Log.info { "Leader: #{uri}" }
key = "#{@config.clustering_etcd_prefix}/clustering_secret"
secret = @etcd.get(key)
until secret # the leader might not have had time to set the secret yet
Log.debug { "Clustering secret is missing, watching for it" }
Log.debug { "follow_leader Clustering secret is missing, watching for it" }
@etcd.watch(key) do |value|
secret = value
break
Expand All @@ -68,9 +74,12 @@ class LavinMQ::Clustering::Controller
spawn r.follow(uri), name: "Clustering client #{uri}"
SystemD.notify_ready
end
ensure
Log.debug { "follow_leader exiting" }
end

def wait_to_be_insync
Log.debug { "Waiting to be in sync..." }
if isr = @etcd.get("#{@config.clustering_etcd_prefix}/isr")
unless isr.split(",").map(&.to_i(36)).includes?(@id)
Log.info { "ISR: #{isr}" }
Expand All @@ -82,4 +91,10 @@ class LavinMQ::Clustering::Controller
end
end
end

def in_sync_to_be_leader?
return true unless isr = @etcd.get("#{@config.clustering_etcd_prefix}/isr")
Log.debug { "#in_sync_to_be_leader? isr=#{isr} id=#{@id.to_s(36)}" }
isr.split(",").map(&.to_i(36)).includes?(@id)
end
end

0 comments on commit c281b0d

Please sign in to comment.