Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clustering stability #879

Merged
merged 8 commits into from
Jan 1, 2025
21 changes: 14 additions & 7 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ describe LavinMQ::Clustering::Client do
"--log-level=error",
"--unsafe-no-fsync=true",
"--force-new-cluster=true",
"--listen-peer-urls=http://127.0.0.1:12380",
"--listen-client-urls=http://127.0.0.1:12379",
"--advertise-client-urls=http://127.0.0.1:12379",
}, output: STDOUT, error: STDERR)

client = HTTP::Client.new("127.0.0.1", 2379)
client = HTTP::Client.new("127.0.0.1", 12379)
i = 0
loop do
sleep 0.02.seconds
Expand All @@ -26,7 +29,7 @@ describe LavinMQ::Clustering::Client do
end
rescue e : Socket::ConnectError
i += 1
raise "Cant connect to etcd on port 2379. Giving up after 100 tries. (#{e.message})" if i >= 100
raise "Cant connect to etcd on port 12379. Giving up after 100 tries. (#{e.message})" if i >= 100
next
end
client.close
Expand All @@ -40,7 +43,7 @@ describe LavinMQ::Clustering::Client do
end

it "can stream changes" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
Expand All @@ -60,7 +63,7 @@ describe LavinMQ::Clustering::Client do
done.receive
end

server = LavinMQ::Server.new(follower_data_dir)
server = LavinMQ::Server.new(config)
begin
q = server.vhosts["/"].queues["repli"].as(LavinMQ::AMQP::DurableQueue)
q.message_count.should eq 1
Expand All @@ -70,10 +73,12 @@ describe LavinMQ::Clustering::Client do
ensure
server.close
end
ensure
replicator.try &.close
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
Expand All @@ -91,7 +96,7 @@ describe LavinMQ::Clustering::Client do
done.receive
end

server = LavinMQ::Server.new(follower_data_dir)
server = LavinMQ::Server.new(config)
begin
server.users["u1"].should_not be_nil
ensure
Expand All @@ -102,6 +107,7 @@ describe LavinMQ::Clustering::Client do
it "will failover" do
config1 = LavinMQ::Config.new
config1.data_dir = "/tmp/failover1"
config1.clustering_etcd_endpoints = "localhost:12379"
config1.clustering_advertised_uri = "tcp://localhost:5681"
config1.clustering_port = 5681
config1.amqp_port = 5671
Expand All @@ -110,6 +116,7 @@ describe LavinMQ::Clustering::Client do

config2 = LavinMQ::Config.new
config2.data_dir = "/tmp/failover2"
config2.clustering_etcd_endpoints = "localhost:12379"
config2.clustering_advertised_uri = "tcp://localhost:5682"
config2.clustering_port = 5682
config2.amqp_port = 5672
Expand All @@ -118,7 +125,7 @@ describe LavinMQ::Clustering::Client do

listen = Channel(String).new
spawn(name: "etcd elect leader spec") do
etcd = LavinMQ::Etcd.new
etcd = LavinMQ::Etcd.new("localhost:12379")
etcd.elect_listen("lavinmq/leader") do |value|
listen.send value
end
Expand Down
1 change: 1 addition & 0 deletions spec/etcd_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "spec"
require "../src/lavinmq/etcd"
require "file_utils"
require "http/client"

describe LavinMQ::Etcd do
it "can put and get" do
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ end

def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.new, & : LavinMQ::Server -> Nil)
tcp_server = TCPServer.new("localhost", 0)
s = LavinMQ::Server.new(LavinMQ::Config.instance.data_dir, replicator)
s = LavinMQ::Server.new(LavinMQ::Config.instance, replicator)
begin
if tls
ctx = OpenSSL::SSL::Context::Server.new
Expand Down
3 changes: 2 additions & 1 deletion spec/storage_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ describe LavinMQ::AMQP::DurableQueue do
end

it "should succefully convert queue index" do
server = LavinMQ::Server.new("/tmp/lavinmq-spec-index-v2")
config = LavinMQ::Config.new.tap &.data_dir = "/tmp/lavinmq-spec-index-v2"
server = LavinMQ::Server.new(config)
begin
q = server.vhosts["/"].queues["queue"].as(LavinMQ::AMQP::DurableQueue)
q.basic_get(true) do |env|
Expand Down
2 changes: 1 addition & 1 deletion spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ describe LavinMQ::Federation::Upstream do

select
when ch.receive?
when timeout 100.milliseconds
when timeout 3.seconds
fail "federation didn't resume? timeout waiting for message on downstream queue"
end

Expand Down
6 changes: 3 additions & 3 deletions spec/vhost_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe LavinMQ::VHost do
end

it "should be able to persist durable delayed exchanges when type = x-delayed-message" do
data_dir = ""
config = LavinMQ::Config.new
with_amqp_server do |s|
# This spec is to verify a fix where a server couldn't start again after a crash if
# an delayed exchange had been declared by specifiying the type as "x-delayed-message".
Expand All @@ -46,11 +46,11 @@ describe LavinMQ::VHost do

# Start a new server with the same data dir as `Server` without stopping
# `Server` first, because stopping would compact definitions and therefore "rewrite"
data_dir = s.data_dir
config.data_dir = s.data_dir
end
# the definitions file. This is to simulate a start after a "crash".
# If this succeeds we assume it worked...?
LavinMQ::Server.new(data_dir)
LavinMQ::Server.new(config)
end

it "should be able to persist durable queues" do
Expand Down
10 changes: 5 additions & 5 deletions src/lavinmq/clustering/actions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ module LavinMQ

def lag_size : Int64
if mfile = @mfile
0i64 + sizeof(Int32) + filename.bytesize +
0i64 + sizeof(Int32) + @filename.bytesize +
sizeof(Int64) + mfile.size.to_i64
else
0i64 + sizeof(Int32) + filename.bytesize +
sizeof(Int64) + File.size(File.join(@data_dir, filename)).to_i64
0i64 + sizeof(Int32) + @filename.bytesize +
sizeof(Int64) + File.size(File.join(@data_dir, @filename)).to_i64
end
end

Expand Down Expand Up @@ -69,7 +69,7 @@ module LavinMQ
in UInt32, Int32
4i64
end
0i64 + sizeof(Int32) + filename.bytesize +
0i64 + sizeof(Int32) + @filename.bytesize +
sizeof(Int64) + datasize
end

Expand Down Expand Up @@ -100,7 +100,7 @@ module LavinMQ
# Maybe it would be ok to not include delete action in lag, because
# the follower should have all info necessary to GC the file during
# startup?
(sizeof(Int32) + filename.bytesize + sizeof(Int64)).to_i64
(sizeof(Int32) + @filename.bytesize + sizeof(Int64)).to_i64
end

def send(socket, log = Log) : Int64
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ module LavinMQ
end

def follow(host : String, port : Int32)
Log.info { "Following #{host}:#{port}" }
@host = host
@port = port
if amqp_proxy = @amqp_proxy
Expand Down
23 changes: 19 additions & 4 deletions src/lavinmq/clustering/controller.cr
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@ class LavinMQ::Clustering::Controller
def run
spawn(follow_leader, name: "Follower monitor")
wait_to_be_insync
lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
replicator = Clustering::Server.new(@config, @etcd)
@launcher = l = Launcher.new(@config, replicator, lease)
l.run
@lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
# TODO: make sure we still are in the ISR set
@launcher = Launcher.new(@config, @etcd).start
loop do
if lease.wait(30.seconds)
break if @stopped
Log.fatal { "Lost cluster leadership" }
exit 3
else
GC.collect
end
end
end

@stopped = false

def stop
@stopped = true
@launcher.try &.stop
@lease.try &.release
end

# Each node in a cluster has an unique id, for tracking ISR
Expand Down Expand Up @@ -66,6 +78,9 @@ class LavinMQ::Clustering::Controller
spawn r.follow(uri), name: "Clustering client #{uri}"
SystemD.notify_ready
end
rescue ex
Log.fatal(exception: ex) { "Unhandled exception while following leader" }
exit 36 # 36 for CF (Cluster Follower)
end

def wait_to_be_insync
Expand Down
1 change: 0 additions & 1 deletion src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ module LavinMQ
validate_header!
authenticate!(password)
@id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian
@socket.read_timeout = nil
@socket.tcp_nodelay = true
@socket.read_buffering = false
@socket.sync = true # Use buffering in lz4
Expand Down
6 changes: 3 additions & 3 deletions src/lavinmq/data_dir_lock.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@ module LavinMQ
@lock.flock_exclusive(blocking: true)
Log.info { "Lock acquired" }
end
Log.debug { "Data directory lock aquired" }
@lock.truncate
@lock.print "PID #{Process.pid} @ #{System.hostname}"
@lock.fsync
end

def release
@lock.truncate
@lock.fsync
@lock.flock_unlock
end

Expand All @@ -37,7 +36,8 @@ module LavinMQ
def poll
@lock.read_at(0, 1, &.read_byte) || raise IO::EOFError.new
rescue ex : IO::Error | ArgumentError
abort "ERROR: Lost data directory lock! #{ex.inspect}"
Log.fatal(exception: ex) { "Lost data dir lock" }
exit 4 # 4 for D(dataDir)
end
end
end
Loading
Loading