Skip to content

Commit

Permalink
chech max followers when followers change, removes timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Nov 13, 2023
1 parent 376a26a commit cf406aa
Showing 1 changed file with 4 additions and 6 deletions.
10 changes: 4 additions & 6 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ module LavinMQ
# The follower sends back/acknowledges how many bytes it has received
class Server
Log = ::Log.for("replication")
getter wait_for_followers = Channel(Nil).new
getter followers_changed = Channel(Nil).new
@lock = Mutex.new(:unchecked)
@followers = Array(Follower).new
@password : String
Expand Down Expand Up @@ -102,10 +102,7 @@ module LavinMQ

def wait_for_max_lag
until @followers.size >= Config.instance.min_followers
select
when @wait_for_followers.receive
when timeout(1.seconds)
end
break unless @followers_changed.receive?
end
@followers.each_with_index do |f, i|
break if i > Config.instance.min_followers
Expand Down Expand Up @@ -223,7 +220,7 @@ module LavinMQ

def read_acks(socket = @socket) : Nil
spawn action_loop, name: "Follower#action_loop"
@server.wait_for_followers.try_send nil
@server.followers_changed.try_send nil
loop do
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
Expand Down Expand Up @@ -416,6 +413,7 @@ module LavinMQ
Log.info { "Disconnected" }
@actions.close
@ack.close
@server.followers_changed.try_send nil
@lz4.close
@socket.close
rescue IO::Error
Expand Down

0 comments on commit cf406aa

Please sign in to comment.