From cf406aa3bb23ba5b120eba425167b4406f64296f Mon Sep 17 00:00:00 2001 From: Christina Date: Mon, 13 Nov 2023 13:35:22 +0100 Subject: [PATCH] chech max followers when followers change, removes timeout --- src/lavinmq/replication/server.cr | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index d9c9336551..db6e80cb04 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -19,7 +19,7 @@ module LavinMQ # The follower sends back/acknowledges how many bytes it has received class Server Log = ::Log.for("replication") - getter wait_for_followers = Channel(Nil).new + getter followers_changed = Channel(Nil).new @lock = Mutex.new(:unchecked) @followers = Array(Follower).new @password : String @@ -102,10 +102,7 @@ module LavinMQ def wait_for_max_lag until @followers.size >= Config.instance.min_followers - select - when @wait_for_followers.receive - when timeout(1.seconds) - end + break unless @followers_changed.receive? end @followers.each_with_index do |f, i| break if i > Config.instance.min_followers @@ -223,7 +220,7 @@ module LavinMQ def read_acks(socket = @socket) : Nil spawn action_loop, name: "Follower#action_loop" - @server.wait_for_followers.try_send nil + @server.followers_changed.try_send nil loop do len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len @@ -416,6 +413,7 @@ module LavinMQ Log.info { "Disconnected" } @actions.close @ack.close + @server.followers_changed.try_send nil @lz4.close @socket.close rescue IO::Error