Skip to content

Commit

Permalink
add min_followers and max_lag
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed May 7, 2024
1 parent 7d610ec commit 0d6dacd
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 10 deletions.
2 changes: 2 additions & 0 deletions extras/lavinmq.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ tcp_proxy_protocol = false
tls_port = 5671
unix_path = /tmp/lavinmq.sock
unix_proxy_protocol = true
min_followers = 1
max_lag = 10000000000
80 changes: 80 additions & 0 deletions spec/replication_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,83 @@ describe LavinMQ::Replication::Client do
end
end
end

describe LavinMQ::Replication::Server do
data_dir = "/tmp/lavinmq-follower"

before_each do
FileUtils.rm_rf data_dir
Dir.mkdir_p data_dir
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
Server.vhosts["/"].declare_queue("repli", true, false)
LavinMQ::Config.instance.min_followers = 1
end

after_each do
FileUtils.rm_rf data_dir
LavinMQ::Config.instance.min_followers = 0
end


it "should shut down gracefully" do
repli = LavinMQ::Replication::Client.new(data_dir)
3.times do
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
end

# repli.closing

end

it "should publish when min_followers is fulfilled" do
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
repli = LavinMQ::Replication::Client.new(data_dir)
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
with_channel do |ch|
ch.basic_publish "hello world", "", "repli"
end
q.basic_get(true) { }.should be_true
repli.close
end

it "should not publish when min_followers is not fulfilled" do
done = Channel(Nil).new
client : AMQP::Client::Connection? = nil
spawn do
with_channel do |ch, conn|
client = conn
q = ch.queue("repli")
q.publish_confirm "hello world"
done.send nil
end
end
select
when done.receive
fail "Should not receive message"
when timeout(0.1.seconds)
client.try &.close(no_wait: true)
Server.close
end
end

# it "should publish when max_lag is not reached" do
# LavinMQ::Config.instance.max_lag = 10000
# q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
# repli = LavinMQ::Replication::Client.new(data_dir)
# spawn do
# repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
# end
# with_channel do |ch|
# ch.basic_publish "hello world", "", "repli"
# end
# q.basic_get(true) { }.should be_true
# repli.close
# end

# it "should not publish when max_lag is reached" do
# end
end
2 changes: 1 addition & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def with_channel(file = __FILE__, line = __LINE__, **args, &)
args = {port: LavinMQ::Config.instance.amqp_port, name: name}.merge(args)
conn = AMQP::Client.new(**args).connect
ch = conn.channel
yield ch
yield ch, conn
ensure
conn.try &.close(no_wait: false)
end
Expand Down
6 changes: 5 additions & 1 deletion src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module LavinMQ
DEFAULT_LOG_LEVEL = Log::Severity::Info

property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq")
property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : ""
property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : "/Users/christinadahlen/84codes/lavinmq/extras/lavinmq.ini"
property log_file : String? = nil
property log_level : Log::Severity = DEFAULT_LOG_LEVEL
property amqp_bind = "127.0.0.1"
Expand Down Expand Up @@ -50,6 +50,8 @@ module LavinMQ
property max_deleted_definitions = 8192 # number of deleted queues, unbinds etc that compacts the definitions file
property consumer_timeout : UInt64? = nil
property consumer_timeout_loop_interval = 60 # seconds
property min_followers : Int64 = 0
property max_lag : Int64? = nil
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down Expand Up @@ -141,6 +143,8 @@ module LavinMQ
when "systemd_socket_name" then @amqp_systemd_socket_name = v
when "unix_proxy_protocol" then @unix_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8
when "tcp_proxy_protocol" then @tcp_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8
when "min_followers" then @min_followers = v.to_i64
when "max_lag" then @max_lag = v.to_i64
else
STDERR.puts "WARNING: Unrecognized configuration 'amqp/#{config}'"
end
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/http/controller/nodes.cr
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ module LavinMQ
run_queue: 0,
sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size },
followers: @amqp_server.followers,
min_followers: LavinMQ::Config.instance.min_followers,
max_lag: LavinMQ::Config.instance.max_lag,
}
end

Expand Down
4 changes: 2 additions & 2 deletions src/lavinmq/replication/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ module LavinMQ
@data_dir_lock : DataDirLock?
@closed = false

def initialize(@data_dir : String)
def initialize(@data_dir : String, pwd : String? = nil)
System.maximize_fd_limit
@socket = TCPSocket.new
@socket.sync = true
@socket.read_buffering = false
@lz4 = Compress::LZ4::Reader.new(@socket)
@password = password
@password = pwd || password
@files = Hash(String, File).new do |h, k|
path = File.join(@data_dir, k)
Dir.mkdir_p File.dirname(path)
Expand Down
22 changes: 19 additions & 3 deletions src/lavinmq/replication/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module LavinMQ
module Replication
class Follower
Log = ::Log.for(self)

@ack = Channel(Int64).new
@acked_bytes = 0_i64
@sent_bytes = 0_i64
@actions = Channel(Action).new(4096)
Expand Down Expand Up @@ -48,17 +48,32 @@ module LavinMQ
loop do
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
if max_lag = Config.instance.max_lag
if lag < max_lag
@ack.try_send lag
end
end
end
rescue IO::Error
end

def wait_for_max_lag
if max_lag = Config.instance.max_lag
current_lag = lag
until current_lag < max_lag
break unless current_lag = @ack.receive?
end
end
end

private def action_loop(socket = @lz4)
while action = @actions.receive?
@sent_bytes += action.send(socket)
sent_bytes = action.send(socket)
while action2 = @actions.try_receive?
@sent_bytes += action2.send(socket)
sent_bytes += action2.send(socket)
end
socket.flush
@sent_bytes += sent_bytes
end
rescue IO::Error
ensure
Expand Down Expand Up @@ -147,6 +162,7 @@ module LavinMQ
Log.info { "Disconnected" }
wait_for_sync if synced_close
@actions.close
@ack.close
@lz4.close
@socket.close
rescue IO::Error
Expand Down
34 changes: 32 additions & 2 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ module LavinMQ
include FileIndex
include Replicator
Log = ::Log.for("replication")
getter followers_changed = Channel(Nil).new
getter? closing
@lock = Mutex.new(:unchecked)
@followers = Array(Follower).new
@password : String
@files = Hash(String, MFile?).new
@closing = false

def initialize
@password = password
def initialize(@password = password)
@tcp = TCPServer.new
@tcp.sync = false
@tcp.reuse_address = true
Expand Down Expand Up @@ -59,11 +61,13 @@ module LavinMQ

def append(path : String, obj)
Log.debug { "appending #{obj} to #{path}" }
wait_for_max_lag unless closing?
each_follower &.append(path, obj)
end

def delete_file(path : String)
@files.delete(path)
wait_for_max_lag unless closing?
each_follower &.delete(path)
end

Expand Down Expand Up @@ -106,6 +110,22 @@ module LavinMQ
end
end

def wait_for_max_lag
# was_closing = @closing
until @closing || @followers.size >= Config.instance.min_followers
@followers_changed.receive
end
# unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers
# raise Exception.new("Not enough followers")
# end
# use waitgroup instead l8er
@followers.each_with_index do |f, i|
break if i > Config.instance.min_followers
f.wait_for_max_lag
end
rescue Channel::ClosedError
end

private def password : String
path = File.join(Config.instance.data_dir, ".replication_secret")
begin
Expand Down Expand Up @@ -143,12 +163,14 @@ module LavinMQ
follower.full_sync
@followers << follower
end
followers_changed.try_send nil
begin
follower.read_acks
ensure
@lock.synchronize do
@followers.delete(follower)
end
followers_changed.try_send nil
end
rescue ex : AuthenticationError
Log.warn { "Follower negotiation error" }
Expand All @@ -163,6 +185,7 @@ module LavinMQ
end

def close
Log.debug { "closing" }
@tcp.close
@lock.synchronize do
done = Channel({Follower, Bool}).new
Expand All @@ -175,9 +198,16 @@ module LavinMQ
end
@followers.clear
end
@followers_changed.close
Log.debug { "closed" }
Fiber.yield # required for follower/listener fibers to actually finish
end

def closing
@closing = true
@followers_changed.try_send? nil
end

private def each_follower(& : Follower -> Nil) : Nil
@lock.synchronize do
@followers.each do |f|
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,12 @@ module LavinMQ

def close
@closed = true
@replicator.closing
@log.debug { "Closing listeners" }
@listeners.each_key &.close
@log.debug { "Closing vhosts" }
@vhosts.close
@log.debug { "Closing replicator" }
@replicator.close
end

Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ module LavinMQ
headers = msg.properties.headers
find_all_queues(ex, msg.routing_key, headers, visited, found_queues)
headers.delete("BCC") if headers

if found_queues.empty?
ex.unroutable_count += 1
return false
Expand Down Expand Up @@ -456,8 +457,11 @@ module LavinMQ
sleep 0.1
end
# then force close the remaining (close tcp socket)
@log.debug { "force closing connection" } unless connections.empty?

@connections.each &.force_close
Fiber.yield # yield so that Client read_loops can shutdown
@log.debug { "Closing queues" }
@queues.each_value &.close
Fiber.yield
compact!
Expand Down
6 changes: 6 additions & 0 deletions static/js/nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ function render (data) {
document.querySelector('#version').textContent = data[0].applications[0].version
for (const node of data) {
updateDetails(node)
updateFollowerSettings(node)
updateStats(node)
}
}
Expand Down Expand Up @@ -72,6 +73,11 @@ const updateDetails = (nodeStats) => {
document.getElementById('tr-disk').textContent = diskUsage
}

const updateFollowerSettings = (nodeStats) => {
document.getElementById('tr-max-lag').textContent = nodeStats.max_lag || 'No max_lag value specified'
document.getElementById('tr-min-followers').textContent = nodeStats.min_followers
}

const stats = [
{
heading: 'Connection',
Expand Down
14 changes: 13 additions & 1 deletion views/nodes.ecr
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
<h3>Queue churn</h3>
<div class="chart-container" id="queueChurnChart"></div>
</section>
<section class="card">
<section class="card cols-10">
<h3>
Followers
<small id="followers-count"></small>
Expand All @@ -94,6 +94,18 @@
</table>
</div>
</section>
<section class="card cols-2">
<h3>Follower Settings</h3>
<table class="details-table">
<tr>
<th>Minimum amount of followers</th>
<td id="tr-min-followers"></td>
</tr>
<tr>
<th>Maximum amount of lag</th>
<td id="tr-max-lag"></td>
</tr>
</table>
</main>
<% render "footer" %>
</body>
Expand Down

0 comments on commit 0d6dacd

Please sign in to comment.