Skip to content

Commit

Permalink
Add min_followers and max_lag to Config,
Browse files Browse the repository at this point in the history
don't publish messages if max or min checks for both values does not pass
  • Loading branch information
kickster97 committed Aug 18, 2023
1 parent debbf03 commit 92b1023
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 3 deletions.
2 changes: 2 additions & 0 deletions extras/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ tcp_proxy_protocol = false
tls_port = 5671
unix_path = /tmp/lavinmq.sock
unix_proxy_protocol = true
min_followers = 0
max_lag = 0
4 changes: 4 additions & 0 deletions src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ module LavinMQ
property replication_follow : URI? = nil
property replication_bind : String? = nil
property replication_port = 5679
property min_followers : Int64 = 0
property max_lag : Int64 = 0
@@instance : Config = self.new

def self.instance : LavinMQ::Config
Expand Down Expand Up @@ -136,6 +138,8 @@ module LavinMQ
when "systemd_socket_name" then @amqp_systemd_socket_name = v
when "unix_proxy_protocol" then @unix_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8
when "tcp_proxy_protocol" then @tcp_proxy_protocol = true?(v) ? 1u8 : v.to_u8? || 0u8
when "min_followers" then @min_followers = v.to_i64
when "max_lag" then @max_lag = v.to_i64
else
STDERR.puts "WARNING: Unrecognized configuration 'amqp/#{config}'"
end
Expand Down
2 changes: 2 additions & 0 deletions src/lavinmq/http/controller/nodes.cr
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ module LavinMQ
run_queue: 0,
sockets_used: @amqp_server.vhosts.sum { |_, v| v.connections.size },
followers: @amqp_server.followers,
min_followers: @amqp_server.min_followers,
max_lag: @amqp_server.max_lag,
}
end

Expand Down
18 changes: 18 additions & 0 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ module LavinMQ
@followers = Array(Follower).new
@password : String
@files = Hash(String, MFile?).new
@min_followers : Int64
@max_lag : Int64

def initialize
@password = password
@tcp = TCPServer.new
@tcp.sync = false
@tcp.reuse_address = true
@min_followers = Config.instance.min_followers
@max_lag = Config.instance.max_lag
end

def clear
Expand Down Expand Up @@ -98,6 +102,20 @@ module LavinMQ
end
end

def has_min_followers?
@followers.size >= @min_followers
end

getter min_followers

def has_max_lag?
@followers.all? do |f|
f.lag <= @max_lag
end
end

getter max_lag

private def password : String
path = File.join(Config.instance.data_dir, ".replication_secret")
begin
Expand Down
8 changes: 8 additions & 0 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ module LavinMQ
@replicator.followers
end

def max_lag
@replicator.max_lag
end

def min_followers
@replicator.min_followers
end

def stop
return if @closed
@closed = true
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/vhost.cr
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ module LavinMQ
headers = properties.headers
find_all_queues(ex, msg.routing_key, headers, visited, found_queues)
headers.try(&.delete("BCC"))
return false unless @replicator.has_min_followers? || @replicator.has_max_lag?
# @log.debug { "publish queues#found=#{found_queues.size}" }
if found_queues.empty?
ex.unroutable_count += 1
Expand Down
10 changes: 8 additions & 2 deletions static/js/nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ function render (data) {
document.querySelector('#version').textContent = data[0].applications[0].version
for (const node of data) {
updateDetails(node)
updateFollowerSettings(node)
updateStats(node)
}
}
Expand Down Expand Up @@ -88,6 +89,11 @@ const updateDetails = (nodeStats) => {
document.getElementById('tr-disk').textContent = diskUsage
}

const updateFollowerSettings = (nodeStats) => {
document.getElementById('tr-max-lag').textContent = nodeStats.max_lag
document.getElementById('tr-min-followers').textContent = nodeStats.min_followers
}

const stats = [
{
heading: 'Connection',
Expand Down Expand Up @@ -274,8 +280,8 @@ function humanizeBytes(bytes, si=false, dp=1) {
return bytes + ' B';
}

const units = si
? ['kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
const units = si
? ['kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']
: ['KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB'];
let u = -1;
const r = 10**dp;
Expand Down
15 changes: 14 additions & 1 deletion views/nodes.ecr
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
<h3>Queue churn</h3>
<div class="chart-container" id="queueChurnChart"></div>
</section>
<section class="card">
<section class="card cols-10">
<h3>
Followers
<small id="followers-count"></small>
Expand All @@ -94,6 +94,19 @@
</table>
</div>
</section>
<!-- make into table with form inside!! -->
<section class="card cols-2">
<h3>Follower Settings</h3>
<table class="follower-settings-table">
<tr>
<th>Minimum amount of followers</th>
<td id="tr-min-followers"></td>
</tr>
<tr>
<th>Maximum amount of lag</th>
<td id="tr-max-lag"></td>
</tr>
</table>
</main>
<% render "footer" %>
</body>
Expand Down

0 comments on commit 92b1023

Please sign in to comment.