Skip to content

Commit

Permalink
use pagination to get all queues. Make checking for which queues to d…
Browse files Browse the repository at this point in the history
…elete much quicker by reading .queue files instead of computing SHA1 where possible
  • Loading branch information
viktorerlingsson committed Jan 10, 2025
1 parent 6a3e6a5 commit 1dbef7e
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions src/lavinmqctl.cr
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,16 @@ class LavinMQCtl

MAX_PAGE_SIZE = 10_000

private def get(url, page = 1, items = [] of JSON::Any)
resp = http.get("#{url}?page=#{page}&page_size=#{MAX_PAGE_SIZE}", @headers)
private def get(url, columns = nil, page = 1, items = [] of JSON::Any)
uri = "#{url}?page=#{page}&page_size=#{MAX_PAGE_SIZE}"
uri += "&columns=#{columns}" if columns
resp = http.get(uri, @headers)
handle_response(resp, 200)
if data = JSON.parse(resp.body).as_h?
items += data["items"].as_a
page = data["page"].as_i
if page < data["page_count"].as_i
return get(url, page + 1, items)
return get(url, columns, page + 1, items)
end
else
abort "invalid data"
Expand Down Expand Up @@ -692,17 +694,14 @@ class LavinMQCtl

vhosts = [vhost]
unless vhost # no vhost specified, get all vhosts
resp = http.get "/api/vhosts", @headers
handle_response(resp, 200)
if vhost_list = JSON.parse(resp.body).as_a?
vhosts = vhost_list.map do |u|
next unless v = u.as_h?
v["name"].to_s
end
vhosts = get("/api/vhosts").map do |u|
next unless v = u.as_h?
v["name"].to_s
end
end

vhosts.compact.each do |current_vhost|
puts "Cleaning up vhost #{current_vhost} ..." unless quiet?
queue_dirs = [] of String
vhost_data_dir = File.join(data_dir, Digest::SHA1.hexdigest(current_vhost))
begin
Expand All @@ -718,13 +717,16 @@ class LavinMQCtl
rescue e : File::NotFoundError
end

resp = http.get "/api/queues/#{URI.encode_www_form(current_vhost)}", @headers
handle_response(resp, 200)
if queues = JSON.parse(resp.body).as_a?
queues.each do |q|
next unless v = q.as_h?
queue_dirs.reject! { |path| path.includes?(Digest::SHA1.hexdigest(v["name"].to_s)) }
end
queues = get("/api/queues/#{URI.encode_www_form(current_vhost)}","name")

Dir.glob("#{vhost_data_dir}/*/.queue").each do |dir|
queues.delete({ "name"=> File.read(dir) })
queue_dirs.delete(dir[0..-8])
end

queues.each do |q|
next unless v = q.as_h?
queue_dirs.reject! { |path| path.includes?(Digest::SHA1.hexdigest(v["name"].to_s)) }
end

queue_dirs.each do |dir|
Expand Down

0 comments on commit 1dbef7e

Please sign in to comment.