Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to set connection options on a per server basis #107

Merged
merged 25 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
578044b
Use connection options to create bunny in publisher
david-krentzlin Aug 7, 2024
90ea9a7
Move per server options into configuration class
david-krentzlin Aug 7, 2024
82d37f6
Use connection options in subscriber
david-krentzlin Aug 7, 2024
8c979aa
Replace docker-compose with docker compose
david-krentzlin Aug 7, 2024
dfdd263
Add tests for configuration
david-krentzlin Aug 7, 2024
abbac65
Add tests that verifies custom options are used
david-krentzlin Aug 7, 2024
a692388
Add subscriber test with per server settings
david-krentzlin Aug 7, 2024
4344ec6
Also use correct admin connection options
david-krentzlin Aug 9, 2024
d2ef005
add tests
david-krentzlin Aug 9, 2024
4d8fdaf
sketch per server API base url
david-krentzlin Aug 9, 2024
daf442f
Configure ssl for admin connections
david-krentzlin Aug 12, 2024
623cddd
Log connection options
david-krentzlin Aug 12, 2024
e5effef
Set proper default port
david-krentzlin Aug 12, 2024
bcc3895
clean up
danielgoncharov Aug 13, 2024
9272748
wip
david-krentzlin Aug 13, 2024
f33c421
Add tests for api requests in queue properties
david-krentzlin Aug 13, 2024
db859ca
Overhaul API requests in queue_properties
david-krentzlin Aug 13, 2024
2f19c64
Update Gemfile
danielgoncharov Aug 15, 2024
89c84d0
naming
danielgoncharov Aug 15, 2024
9a24cc1
Revert "naming"
danielgoncharov Aug 15, 2024
c4c0980
naming
danielgoncharov Aug 15, 2024
d0e34c5
update tests
danielgoncharov Aug 15, 2024
6533727
change ssl to integer
arte7 Aug 15, 2024
c672059
adapt workflow to also run with v4.x branch
david-krentzlin Aug 15, 2024
01a016f
Merge branch 'per-server-config' of github.com:xing/beetle into per-s…
david-krentzlin Aug 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
run: sudo apt-get install redis

- name: Start required services
run: docker-compose up -d
run: docker compose up -d

- name: Install gems
run: bundle install && bundle exec appraisal install
Expand All @@ -53,5 +53,5 @@ jobs:
BUNDLE_GEMFILE: gemfiles/redis_${{ matrix.redis-version }}.gemfile

- name: Stop services
run: docker-compose down
run: docker compose down
if: always()
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ source 'https://rubygems.org'
gemspec

gem "hiredis-client"

gem "pry"
# gem 'bunny', '=0.7.10', :path => "#{ENV['HOME']}/src/bunny"

# Use patched appraisal gem until it is fixed upstream.
Expand Down
5 changes: 5 additions & 0 deletions lib/beetle/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ def current_port
@server =~ /:(\d+)$/ ? $1.to_i : 5672
end


def connection_options_for_server(server)
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
@client.config.connection_options_for_server(server)
end

def set_current_server(s)
@server = s
end
Expand Down
33 changes: 29 additions & 4 deletions lib/beetle/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ class Configuration

# list of amqp servers to use (defaults to <tt>"localhost:5672"</tt>)
attr_accessor :servers

# list of additional amqp servers to use for subscribers (defaults to <tt>""</tt>)
attr_accessor :additional_subscription_servers
david-krentzlin marked this conversation as resolved.
Show resolved Hide resolved

# a hash mapping a server name to a hash of connection options for that server or additional subscription server
attr_accessor :server_connection_options

# the virtual host to use on the AMQP servers (defaults to <tt>"/"</tt>)
attr_accessor :vhost
# the AMQP user to use when connecting to the AMQP servers (defaults to <tt>"guest"</tt>)
Expand Down Expand Up @@ -118,9 +123,6 @@ class Configuration
# Write timeout for http requests to RabbitMQ HTTP API
attr_accessor :rabbitmq_api_write_timeout

# Returns the port on which the Rabbit API is hosted
attr_accessor :api_port
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved

# the socket timeout in seconds for message publishing (defaults to <tt>0</tt>).
# consider this a highly experimental feature for now.
attr_accessor :publishing_timeout
Expand Down Expand Up @@ -177,11 +179,11 @@ def initialize #:nodoc:
self.redis_configuration_client_ids = ""

self.servers = "localhost:5672"
self.server_connection_options = {}
self.additional_subscription_servers = ""
self.vhost = "/"
self.user = "guest"
self.password = "guest"
self.api_port = 15672
self.frame_max = 131072
self.channel_max = 2047
self.prefetch_count = 1
Expand Down Expand Up @@ -237,7 +239,30 @@ def redis_options
}
end

# Returns a hash of connection options for the given server.
# If no server specific options are set, it constructs defaults which
# use the global user, password and vhost settings.
def connection_options_for_server(server)
overrides = server_connection_options[server] || {}

default_server_connection_options(server).merge(overrides)
end

private

def default_server_connection_options(server)
host, port = server.split(':')
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
port ||= 5672

{
host: host,
port: port.to_i,
user: user,
pass: password,
vhost: vhost,
}
end

def load_config
raw = ERB.new(IO.read(config_file)).result
hash = if config_file =~ /\.json$/
Expand Down
17 changes: 10 additions & 7 deletions lib/beetle/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,18 +161,21 @@ def bunny?
end

def new_bunny
options = connection_options_for_server(@server)

b = Bunny.new(
:host => current_host,
:port => current_port,
:logging => !!@options[:logging],
:user => @client.config.user,
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
:pass => @client.config.password,
:vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:frame_max => @client.config.frame_max,
:channel_max => @client.config.channel_max,
:socket_timeout => @client.config.publishing_timeout,
:connect_timeout => @client.config.publisher_connect_timeout,
:spec => '09')
:spec => '09',
:logging => !!@options[:logging])
b.start
b
end
Expand Down
62 changes: 26 additions & 36 deletions lib/beetle/queue_properties.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ def set_queue_policy!(server, queue_name, options={})

# no need to worry that the server has the port 5672. Net:HTTP will take care of this. See below.
policy_name = "#{queue_name}_policy"
request_url = URI("http://#{server}/api/policies/#{vhost}/#{policy_name}")
get_request = Net::HTTP::Get.new(request_url)
put_request = Net::HTTP::Put.new(request_url)
delete_request = Net::HTTP::Delete.new(request_url)
request_path = "/api/policies/#{vhost}/#{policy_name}"

# set up queue policy
definition = {}
Expand All @@ -70,29 +67,24 @@ def set_queue_policy!(server, queue_name, options={})

is_default_policy = definition == config.broker_default_policy

get_response = run_rabbit_http_request(request_url, get_request) do |http|
http.request(get_request, nil)
end
get_response = run_api_request(server, Net::HTTP::Get, request_path)

case get_response.code
when "200"
response_body = JSON.parse(get_response.body) rescue {}
same_policy = put_request_body.all? { |k,v| response_body[k] == v }
if same_policy
if is_default_policy
run_rabbit_http_request(request_url, delete_request) do |http|
http.request(get_request, nil)
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
end
run_api_request(server, Net::HTTP::Delete, request_path)
end

return :ok
end
when "404"
return :ok if is_default_policy
end

put_response = run_rabbit_http_request(request_url, put_request) do |http|
http.request(put_request, put_request_body.to_json)
end
put_response = run_api_request(server, Net::HTTP::Put, request_path, put_request_body.to_json)

unless %w(200 201 204).include?(put_response.code)
log_error("Failed to create policy for queue #{queue_name}", put_response)
Expand Down Expand Up @@ -125,12 +117,7 @@ def remove_obsolete_bindings(server, queue_name, bindings)
end

def retrieve_bindings(server, queue_name)
request_url = URI("http://#{server}/api/queues/#{vhost}/#{queue_name}/bindings")
request = Net::HTTP::Get.new(request_url)

response = run_rabbit_http_request(request_url, request) do |http|
http.request(request)
end
response = run_api_request(server, Net::HTTP::Get, "/api/queues/#{vhost}/#{queue_name}/bindings")

unless response.code == "200"
log_error("Failed to retrieve bindings for queue #{queue_name}", response)
Expand All @@ -141,34 +128,37 @@ def retrieve_bindings(server, queue_name)
end

def remove_binding(server, queue_name, exchange, properties_key)
request_url = URI("http://#{server}/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")
request = Net::HTTP::Delete.new(request_url)

response = run_rabbit_http_request(request_url, request) do |http|
http.request(request)
end
response = run_api_request(server, Net::HTTP::Delete, "/api/bindings/#{vhost}/e/#{exchange}/q/#{queue_name}/#{properties_key}")

unless %w(200 201 204).include?(response.code)
log_error("Failed to remove obsolete binding for queue #{queue_name}", response)
raise FailedRabbitRequest.new("Could not retrieve queue bindings")
end
end

def run_rabbit_http_request(uri, request, &block)
request.basic_auth(config.user, config.password)
case request.class::METHOD
when 'GET'
request["Accept"] = "application/json"
when 'PUT'
request["Content-Type"] = "application/json"
def run_api_request(server, request_const, path, *request_args)
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved
connection_options = config.connection_options_for_server(server)

derived_api_port = "1#{connection_options[:port]}".to_i
request_url = URI("http://#{connection_options[:host]}:#{derived_api_port}#{path}")

request = request_const.new(request_url).tap do |req|
req.basic_auth(connection_options[:user], connection_options[:pass])
case request_const::METHOD
when 'GET'
req["Accept"] = "application/json"
when 'PUT'
req["Content-Type"] = "application/json"
end
end
http = Net::HTTP.new(uri.hostname, config.api_port)

http = Net::HTTP.new(connection_options[:host], derived_api_port)
http.use_ssl = !!connection_options[:ssl]
http.read_timeout = config.rabbitmq_api_read_timeout
http.write_timeout = config.rabbitmq_api_write_timeout if http.respond_to?(:write_timeout=)
# don't do this in production:
# http.set_debug_output(logger.instance_eval{ @logdev.dev })

http.start do |instance|
block.call(instance) if block_given?
instance.request(request, *request_args)
end
end

Expand Down
11 changes: 9 additions & 2 deletions lib/beetle/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def initialize(client, options = {}) #:nodoc:
def listen_queues(queues) #:nodoc:
@listened_queues = queues
@exchanges_for_queues = exchanges_for_queues(queues)

EM.run do
each_server_sorted_randomly do
connect_server connection_settings
Expand Down Expand Up @@ -229,9 +230,15 @@ def bind_queue!(queue, exchange_name, binding_options)
end

def connection_settings
options = connection_options_for_server(@server)
{
:host => current_host, :port => current_port, :logging => false,
:user => @client.config.user, :pass => @client.config.password, :vhost => @client.config.vhost,
:host => options[:host],
:port => options[:port],
:user => options[:user],
:pass => options[:pass],
:vhost => options[:vhost],
:ssl => options[:ssl],
:logging => false,
:on_tcp_connection_failure => on_tcp_connection_failure,
:on_possible_authentication_failure => on_possible_authentication_failure,
}
Expand Down
2 changes: 1 addition & 1 deletion lib/beetle/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Beetle
VERSION = "3.5.7"
VERSION = "4.0.0"
end
1 change: 0 additions & 1 deletion test/beetle/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,5 @@ def setup
@client.expects(:update_queue_properties!).with(options.merge(:server => "localhost:5672"))
@bs.__send__(:publish_policy_options, options)
end

end
end
47 changes: 47 additions & 0 deletions test/beetle/configuration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,51 @@ class ConfigurationTest < Minitest::Test
assert_equal "10.0.0.1:3001", config.additional_subscription_servers
end
end

class ConnectionOptionsForServerTest < Minitest::Test

test "returns the options for the server provided" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = {host: 'localhost', port: 5672, user: "john", pass: "doe", vhost: "test", ssl: "0"}
danielgoncharov marked this conversation as resolved.
Show resolved Hide resolved

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "john", options[:user]
assert_equal "doe", options[:pass]
assert_equal "test", options[:vhost]
assert_equal "0", options[:ssl]
end
end

test "returns default options if no specific options are set for the server" do
config = Configuration.new
config.servers = 'localhost:5672'

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "guest", options[:pass]
assert_equal "/", options[:vhost]
assert_nil options[:ssl]
end
end

test "allows to set specific options while retaining defaults for the rest" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { pass: "another_pass", ssl: "1" }

config.connection_options_for_server("localhost:5672").tap do |options|
assert_equal "localhost", options[:host]
assert_equal 5672, options[:port]
assert_equal "guest", options[:user]
assert_equal "another_pass", options[:pass]
assert_equal "/", options[:vhost]
assert_equal "1", options[:ssl]
end
end
end
end
29 changes: 29 additions & 0 deletions test/beetle/publisher_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def setup
:user => "guest",
:pass => "guest",
:vhost => "/",
:ssl => nil,
:socket_timeout => 0,
:connect_timeout => 5,
:frame_max => 131072,
Expand All @@ -34,6 +35,34 @@ def setup
assert_equal m, @pub.send(:new_bunny)
end

test "new bunnies should be created using custom connection options and they should be started" do
config = Configuration.new
config.servers = 'localhost:5672'
config.server_connection_options["localhost:5672"] = { user: "john", pass: "doe", vhost: "test", ssl: "0" }
client = Client.new(config)
pub = Publisher.new(client)

m = mock("dummy")
expected_bunny_options = {
host: "localhost",
port: 5672,
user: "john",
pass: "doe",
vhost: "test",
ssl: "0",
socket_timeout: 0,
connect_timeout: 5,
frame_max: 131072,
channel_max: 2047,
spec: '09',
logging: false
}

Bunny.expects(:new).with(expected_bunny_options).returns(m)
m.expects(:start)
assert_equal m, pub.send(:new_bunny)
end

test "initially there should be no bunnies" do
assert_equal({}, @pub.instance_variable_get("@bunnies"))
end
Expand Down
Loading
Loading