Skip to content

Commit

Permalink
beginning of spec
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Nov 13, 2023
1 parent cf406aa commit bebb411
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 7 deletions.
50 changes: 50 additions & 0 deletions spec/replication_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,53 @@ describe LavinMQ::Replication::Client do
end
end
end

describe LavinMQ::Replication::Server do
data_dir = "/tmp/lavinmq-follower"

before_each do
FileUtils.rm_rf data_dir
Dir.mkdir_p data_dir
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
LavinMQ::Config.instance.min_followers = 1

end

after_each do
FileUtils.rm_rf data_dir
LavinMQ::Config.instance.min_followers = 0
end

it "+min_followers" do
Server.vhosts["/"].declare_queue("repli", true, false)
with_channel do |ch|
pp "publish"
ch.basic_publish "hello world", "", "repli"
pp "after spawn"
end
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::DurableQueue)
q.basic_get(true) { }.should be_false

repli = LavinMQ::Replication::Client.new(data_dir)
pp "before spawn"
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
pp "after spawn"
with_channel do |ch|
ch.basic_publish "hello world", "", "repli"
end
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::DurableQueue)
q.basic_get(true) { }.should be_true

repli.close
end

it "-min_followers" do
end

it "+max_lag" do
end
it "-max_lag" do
end
end
4 changes: 2 additions & 2 deletions src/lavinmq/replication/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ module LavinMQ
@data_dir_lock : DataDirLock?
@closed = false

def initialize(@data_dir : String)
def initialize(@data_dir : String, pwd : String? = nil)
System.maximize_fd_limit
@socket = TCPSocket.new
@socket.sync = true
@socket.read_buffering = false
@lz4 = Compress::LZ4::Reader.new(@socket)
@password = password
@password = pwd || password
@files = Hash(String, File).new do |h, k|
path = File.join(@data_dir, k)
Dir.mkdir_p File.dirname(path)
Expand Down
11 changes: 6 additions & 5 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ module LavinMQ
@password : String
@files = Hash(String, MFile?).new

def initialize
@password = password
def initialize(@password = password)
@tcp = TCPServer.new
@tcp.sync = false
@tcp.reuse_address = true
Expand Down Expand Up @@ -102,12 +101,13 @@ module LavinMQ

def wait_for_max_lag
until @followers.size >= Config.instance.min_followers
break unless @followers_changed.receive?
@followers_changed.receive
end
@followers.each_with_index do |f, i|
break if i > Config.instance.min_followers
f.wait_for_max_lag
end
rescue Channel::ClosedError
end

private def password : String
Expand Down Expand Up @@ -147,12 +147,14 @@ module LavinMQ
follower.full_sync
@followers << follower
end
followers_changed.try_send nil
begin
follower.read_acks
ensure
@lock.synchronize do
@followers.delete(follower)
end
followers_changed.try_send nil
end
rescue ex : AuthenticationError
Log.warn { "Follower negotiation error" }
Expand All @@ -170,6 +172,7 @@ module LavinMQ
@followers.each &.close
@followers.clear
end
@followers_changed.close
Fiber.yield # required for follower/listener fibers to actually finish
end

Expand Down Expand Up @@ -220,7 +223,6 @@ module LavinMQ

def read_acks(socket = @socket) : Nil
spawn action_loop, name: "Follower#action_loop"
@server.followers_changed.try_send nil
loop do
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
Expand Down Expand Up @@ -413,7 +415,6 @@ module LavinMQ
Log.info { "Disconnected" }
@actions.close
@ack.close
@server.followers_changed.try_send nil
@lz4.close
@socket.close
rescue IO::Error
Expand Down

0 comments on commit bebb411

Please sign in to comment.