diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e91815bd..6789c6c0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -52,6 +52,12 @@ jobs: env: BUNDLE_GEMFILE: gemfiles/redis_${{ matrix.redis-version }}.gemfile + - name: Collect Docker Logs + uses: jwalton/gh-docker-logs@v2.2.0 + if: failure() + with: + images: 'rabbitmq' + - name: Stop services run: docker-compose down if: always() diff --git a/beetle.gemspec b/beetle.gemspec index a5518d4e..10b0d35c 100644 --- a/beetle.gemspec +++ b/beetle.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |s| } s.specification_version = 3 - s.add_runtime_dependency "bunny", "~> 0.7.13" + s.add_runtime_dependency "bunny", "~> 2.22.0" s.add_runtime_dependency "redis", ">= 4.2.1" s.add_runtime_dependency "hiredis", ">= 0.4.5" s.add_runtime_dependency "amq-protocol", "= 2.3.2" diff --git a/cucumber.yml b/cucumber.yml index fea5edc1..57a288ac 100644 --- a/cucumber.yml +++ b/cucumber.yml @@ -1 +1 @@ -default: --publish-quiet +default: --publish-quiet --tags 'not @ignored' --color diff --git a/features/redis_auto_failover.feature b/features/redis_auto_failover.feature index 40e596b5..a7b044a8 100644 --- a/features/redis_auto_failover.feature +++ b/features/redis_auto_failover.feature @@ -219,3 +219,15 @@ Feature: Redis auto failover And the redis master of "rc-client-2" should be "redis-1" And the redis master of the beetle handler should be "redis-1" And the role of redis server "redis-2" should be "slave" + + @ignored + Scenario: Running the system for a few seconds to perform manual testing + Given a redis configuration server using redis servers "redis-1,redis-2" with clients "rc-client-1,rc-client-2" exists + And a redis configuration client "rc-client-1" using redis servers "redis-1,redis-2" exists + And a redis configuration client "rc-client-2" using redis servers "redis-1,redis-2" exists + And a beetle handler using the redis-master file from "rc-client-1" exists + And the redis master of "rc-client-1" should be "redis-1" + And the redis master of "rc-client-2" should be "redis-1" + And the redis master of the beetle handler should be "redis-1" + And the role of redis server "redis-2" should be "slave" + Then the system can run for a while without dying diff --git a/features/step_definitions/redis_auto_failover_steps.rb b/features/step_definitions/redis_auto_failover_steps.rb index 6779e0b0..f02c865b 100644 --- a/features/step_definitions/redis_auto_failover_steps.rb +++ b/features/step_definitions/redis_auto_failover_steps.rb @@ -158,7 +158,7 @@ Then /^the redis master of the beetle handler should be "([^\"]*)"$/ do |redis_name| Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - Beetle.config.logger.level = Logger::INFO + Beetle.config.logger.level = Logger::FATAL redis_master = TestDaemons::Redis[redis_name].ip_with_port response = `curl -s 127.0.0.1:10254/redis_master`.chomp assert_equal redis_master, response diff --git a/features/support/beetle_handler b/features/support/beetle_handler index da4f14d0..90449bf2 100755 --- a/features/support/beetle_handler +++ b/features/support/beetle_handler @@ -85,25 +85,32 @@ Daemons.run_proc("beetle_handler", :log_output => true, :dir_mode => :normal, :d config.message(:echo) config.handler(:echo) do |message| begin + Beetle.config.logger.info "Received echo request: reply_to: #{message.header.attributes[:reply_to]}" client.deduplication_store.redis.server rescue master_file_content = File.read(Beetle.config.redis_server) - "no redis master: exception: #{$!.class}(#{$!}), master_file: '#{master_file_content}'" + msg = "no redis master: exception: #{$!.class}(#{$!}), master_file: '#{master_file_content}'" + Beetle.config.logger.error msg + msg end end config.handler(Beetle.config.beetle_policy_updates_queue_name) do |message| begin - Beetle.config.logger.info "received beetle policy update message': #{message.data}" + Beetle.config.logger.info "Received beetle policy update message': #{message.data}" client.update_queue_properties!(JSON.parse(message.data)) rescue => e Beetle.config.logger.error("#{e}:#{e.backtrace.join("\n")}") end end end + Beetle.config.logger.info "Starting beetle handler for system: #{Beetle.config.system_name}" client.listen do puts "Started beetle handler for system: #{Beetle.config.system_name}" BeetleStatusServer.setup(client) EM.start_server '0.0.0.0', 10254, BeetleStatusServer + + trap("TERM"){ client.stop_listening } end + Beetle.config.logger.info "Terminated beetle handler for system: #{Beetle.config.system_name}" end diff --git a/features/support/env.rb b/features/support/env.rb index 9226ecc1..3a9add53 100644 --- a/features/support/env.rb +++ b/features/support/env.rb @@ -6,11 +6,14 @@ extend Minitest::Assertions end +# Executed before each scenario Before do cleanup_master_files `ruby features/support/system_notification_logger start` + `ruby features/support/beetle_handler start -- --redis-master-file=#{redis_master_file('rc-client-1')}` end +# Executed after each scenario After do cleanup_test_env end diff --git a/features/support/system_notification_logger b/features/support/system_notification_logger index 2be60c97..9d5f543f 100755 --- a/features/support/system_notification_logger +++ b/features/support/system_notification_logger @@ -5,7 +5,6 @@ require "daemons" require "eventmachine" require "em-http-server" require "websocket-eventmachine-client" -require File.expand_path("../../lib/beetle", File.dirname(__FILE__)) tmp_path = File.expand_path("../../tmp", File.dirname(__FILE__)) system_notification_log_file_path = "#{tmp_path}/system_notifications.log" @@ -29,11 +28,6 @@ end Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => :normal, :dir => tmp_path) do - Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - - # set Beetle log level to info, less noisy than debug - Beetle.config.logger.level = Logger::DEBUG - log_file = File.open(system_notification_log_file_path, "a+") log_file.sync = true @@ -73,4 +67,6 @@ Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => end end + + puts "Terminated system notification logger: #{Process.pid}" end diff --git a/lib/beetle.rb b/lib/beetle.rb index 08a3dde9..e29e1a90 100644 --- a/lib/beetle.rb +++ b/lib/beetle.rb @@ -1,6 +1,5 @@ $:.unshift(File.expand_path('..', __FILE__)) require 'bunny' # which bunny picks up -require 'qrack/errors' # needed by the publisher begin require 'redis/connection/hiredis' # require *before* redis as specified in the redis-rb gem docs @@ -28,8 +27,10 @@ class UnknownMessage < Error; end class UnknownQueue < Error; end # raised when no redis master server can be found class NoRedisMaster < Error; end - # raise when no message could be sent by the publisher + # raised when no message could be sent by the publisher class NoMessageSent < Error; end + # logged when an RPC call timed outdated + class RPCTimedOut < Error; end # AMQP options for exchange creation EXCHANGE_CREATION_KEYS = [:auto_delete, :durable, :internal, :nowait, :passive] diff --git a/lib/beetle/configuration.rb b/lib/beetle/configuration.rb index 25d61dc1..e43717b7 100644 --- a/lib/beetle/configuration.rb +++ b/lib/beetle/configuration.rb @@ -87,6 +87,8 @@ class Configuration # to 2047, which is the RabbitMQ default in 3.7. We can't set this to 0 because of a bug # in bunny. attr_accessor :channel_max + # the heartbeat setting to be used for RabbitMQ heartbeats (defaults to 0). + attr_accessor :heartbeat # Lazy queues have the advantage of consuming a lot less memory on the broker. For backwards # compatibility, they are disabled by default. @@ -121,8 +123,8 @@ class Configuration # Returns the port on which the Rabbit API is hosted attr_accessor :api_port - # the socket timeout in seconds for message publishing (defaults to 0). - # consider this a highly experimental feature for now. + # The socket timeout in seconds for message publishing (defaults to 15). + # Consider this a highly experimental feature for now. attr_accessor :publishing_timeout # the connect/disconnect timeout in seconds for the publishing connection @@ -185,6 +187,7 @@ def initialize #:nodoc: self.frame_max = 131072 self.channel_max = 2047 self.prefetch_count = 1 + self.heartbeat = 0 self.dead_lettering_enabled = false self.dead_lettering_msg_ttl = 1000 # 1 second @@ -196,7 +199,7 @@ def initialize #:nodoc: self.update_queue_properties_synchronously = false - self.publishing_timeout = 0 + self.publishing_timeout = 15 self.publisher_connect_timeout = 5 # seconds self.tmpdir = "/tmp" diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index 900b5185..6add222c 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -9,6 +9,7 @@ def initialize(client, options = {}) #:nodoc: @exchanges_with_bound_queues = {} @dead_servers = {} @bunnies = {} + @channels = {} @throttling_options = {} @next_throttle_refresh = Time.now @throttled = false @@ -27,14 +28,10 @@ def throttling_status @throttled ? 'throttled' : 'unthrottled' end - # list of exceptions potentially raised by bunny - # these need to be lazy, because qrack exceptions are only defined after a connection has been established + # List of exceptions potentially raised by bunny. def bunny_exceptions [ - Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError, - Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError, - Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError, - Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error + Bunny::Exception, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error ] end @@ -46,9 +43,9 @@ def publish(message_name, data, opts={}) #:nodoc: recycle_dead_servers unless @dead_servers.empty? throttle! if opts[:redundant] - publish_with_redundancy(exchange_name, message_name, data, opts) + publish_with_redundancy(exchange_name, message_name, data.to_s, opts) else - publish_with_failover(exchange_name, message_name, data, opts) + publish_with_failover(exchange_name, message_name, data.to_s, opts) end end end @@ -157,26 +154,38 @@ def bunny end def bunny? - @bunnies[@server] + !!@bunnies[@server] end def new_bunny b = Bunny.new( - :host => current_host, - :port => current_port, - :logging => !!@options[:logging], - :user => @client.config.user, - :pass => @client.config.password, - :vhost => @client.config.vhost, - :frame_max => @client.config.frame_max, - :channel_max => @client.config.channel_max, - :socket_timeout => @client.config.publishing_timeout, - :connect_timeout => @client.config.publisher_connect_timeout, - :spec => '09') + :host => current_host, + :port => current_port, + :logger => @client.config.logger, + :username => @client.config.user, + :password => @client.config.password, + :vhost => @client.config.vhost, + :automatically_recover => false, + :frame_max => @client.config.frame_max, + :channel_max => @client.config.channel_max, + :read_timeout => @client.config.publishing_timeout, + :write_timeout => @client.config.publishing_timeout, + :continuation_timeout => @client.config.publishing_timeout, + :connection_timeout => @client.config.publisher_connect_timeout, + :heartbeat => @client.config.heartbeat, + ) b.start b end + def channel + @channels[@server] ||= bunny.create_channel + end + + def channel? + !!@channels[@server] + end + # retry dead servers after ignoring them for 10.seconds # if all servers are dead, retry the one which has been dead for the longest time def recycle_dead_servers @@ -207,7 +216,7 @@ def select_next_server end def create_exchange!(name, opts) - bunny.exchange(name, opts) + channel.exchange(name, opts) end def bind_queues_for_exchange(exchange_name) @@ -218,9 +227,8 @@ def bind_queues_for_exchange(exchange_name) def declare_queue!(queue_name, creation_options) logger.debug("Beetle: creating queue with opts: #{creation_options.inspect}") - queue = bunny.queue(queue_name, creation_options) - - policy_options = bind_dead_letter_queue!(bunny, queue_name, creation_options) + queue = channel.queue(queue_name, creation_options) + policy_options = bind_dead_letter_queue!(channel, queue_name, creation_options) publish_policy_options(policy_options) queue end @@ -235,8 +243,11 @@ def stop!(exception=nil) Beetle::Timer.timeout(timeout) do logger.debug "Beetle: closing connection from publisher to #{server}" if exception - bunny.__send__ :close_socket + bunny.__send__ :close_connection, false + reader_loop = bunny.__send__ :reader_loop + reader_loop.kill if reader_loop else + channel.close if channel? bunny.stop end end @@ -245,6 +256,7 @@ def stop!(exception=nil) Beetle::reraise_expectation_errors! ensure @bunnies[@server] = nil + @channels[@server] = nil @exchanges[@server] = {} @queues[@server] = {} end diff --git a/test/beetle/amqp_gem_behavior_test.rb b/test/beetle/amqp_gem_behavior_test.rb index 937c978e..fea10271 100644 --- a/test/beetle/amqp_gem_behavior_test.rb +++ b/test/beetle/amqp_gem_behavior_test.rb @@ -11,8 +11,8 @@ class AMQPGemBehaviorTest < Minitest::Test EM::Timer.new(1){ connection.close { EM.stop }} channel = AMQP::Channel.new(connection) channel.on_error { puts "woot"} - exchange = channel.topic("beetle_tests") - queue = AMQP::Queue.new(channel) + exchange = channel.topic("beetle_tests", :durable => false, :auto_delete => true) + queue = AMQP::Queue.new(channel, :durable => false, :auto_delete => true) queue.bind(exchange, :key => "#") queue.subscribe { } queue.subscribe { } diff --git a/test/beetle/beetle_test.rb b/test/beetle/beetle_test.rb index cfe266c1..a5e1bccb 100644 --- a/test/beetle/beetle_test.rb +++ b/test/beetle/beetle_test.rb @@ -2,7 +2,7 @@ module Beetle class HostnameTest < Minitest::Test - test "should use canonical name if possible " do + test "should use canonical name if possible " do addr = mock("addr") addr.expects(:canonname).returns("a.b.com") Socket.expects(:gethostname).returns("a.b.com") diff --git a/test/beetle/publisher_test.rb b/test/beetle/publisher_test.rb index b7a7616e..0bd4b81d 100644 --- a/test/beetle/publisher_test.rb +++ b/test/beetle/publisher_test.rb @@ -4,8 +4,8 @@ module Beetle class PublisherTest < Minitest::Test def setup - client = Client.new - @pub = Publisher.new(client) + @client = Client.new + @pub = Publisher.new(@client) end test "acccessing a bunny for a server which doesn't have one should create it and associate it with the server" do @@ -18,16 +18,20 @@ def setup test "new bunnies should be created using current host and port and they should be started" do m = mock("dummy") expected_bunny_options = { - :host => @pub.send(:current_host), :port => @pub.send(:current_port), - :logging => false, - :user => "guest", - :pass => "guest", + :host => @pub.send(:current_host), + :port => @pub.send(:current_port), + :logger => @client.config.logger, + :username => "guest", + :password => "guest", :vhost => "/", - :socket_timeout => 0, - :connect_timeout => 5, + :automatically_recover => false, + :read_timeout => 15, + :write_timeout => 15, + :continuation_timeout => 15, + :connection_timeout => 5, :frame_max => 131072, :channel_max => 2047, - :spec => '09' + :heartbeat => 0, } Bunny.expects(:new).with(expected_bunny_options).returns(m) m.expects(:start) @@ -51,17 +55,22 @@ def setup assert_equal({}, @pub.send(:exchanges)) assert_equal({}, @pub.send(:queues)) assert_nil @pub.instance_variable_get(:@bunnies)[@pub.server] + assert_nil @pub.instance_variable_get(:@channels)[@pub.server] end test "stop!(exception) should close the bunny socket if an exception is not nil" do b = mock("bunny") - b.expects(:close_socket) + l = mock("loop") + b.expects(:close_connection) + b.expects(:reader_loop).returns(l) + l.expects(:kill) @pub.expects(:bunny?).returns(true) - @pub.expects(:bunny).returns(b) + @pub.expects(:bunny).returns(b).twice @pub.send(:stop!, Exception.new) assert_equal({}, @pub.send(:exchanges)) assert_equal({}, @pub.send(:queues)) assert_nil @pub.instance_variable_get(:@bunnies)[@pub.server] + assert_nil @pub.instance_variable_get(:@channels)[@pub.server] end test "stop! should not create a new bunny " do @@ -71,6 +80,7 @@ def setup assert_equal({}, @pub.send(:exchanges)) assert_equal({}, @pub.send(:queues)) assert_nil @pub.instance_variable_get(:@bunnies)[@pub.server] + assert_nil @pub.instance_variable_get(:@channels)[@pub.server] end end @@ -114,7 +124,7 @@ def setup nice_exchange = mock("nice exchange") @pub.stubs(:exchange).with("mama-exchange").returns(raising_exchange).then.returns(raising_exchange).then.returns(nice_exchange) - raising_exchange.expects(:publish).raises(Bunny::ConnectionError).twice + raising_exchange.expects(:publish).raises(Bunny::ConnectionError,'').twice nice_exchange.expects(:publish) @pub.expects(:set_current_server).twice @pub.expects(:stop!).twice @@ -143,9 +153,9 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) e.expects(:publish).in_sequence(redundant) @@ -159,13 +169,13 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) assert_raises Beetle::NoMessageSent do @pub.publish_with_redundancy("mama-exchange", "mama", @data, @opts) @@ -189,7 +199,7 @@ def setup e.expects(:publish).in_sequence(redundant) @pub.expects(:exchange).returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:stop!).in_sequence(redundant) @pub.expects(:exchange).returns(e).in_sequence(redundant) @@ -261,13 +271,13 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) assert_raises Beetle::NoMessageSent do @pub.publish_with_failover("mama-exchange", "mama", @data, @opts) @@ -296,8 +306,10 @@ def setup q = mock("queue") q.expects(:bind).with(:the_exchange, {:key => "haha.#"}) m = mock("Bunny") - m.expects(:queue).with("some_queue", :durable => true, :passive => false, :auto_delete => false, :exclusive => false, :arguments => {"foo" => "fighter"}).returns(q) - @pub.expects(:bunny).returns(m).twice + channel= mock("channel") + m.expects(:create_channel).returns(channel) + channel.expects(:queue).with("some_queue", :durable => true, :passive => false, :auto_delete => false, :exclusive => false, :arguments => {"foo" => "fighter"}).returns(q) + @pub.expects(:bunny).returns(m) @pub.send(:queue, "some_queue") assert_equal q, @pub.send(:queues)["some_queue"] @@ -468,10 +480,12 @@ def setup end test "accessing a given exchange should create it using the config. further access should return the created exchange" do - m = mock("Bunny") - m.expects(:exchange).with("some_exchange", :type => :topic, :durable => true, :queues => []).returns(42) + bunny = mock("bunny") + channel = mock("channel") + channel.expects(:exchange).with("some_exchange", :type => :topic, :durable => true, :queues => []).returns(42) @client.register_exchange("some_exchange", :type => :topic, :durable => true) - @pub.expects(:bunny).returns(m) + @pub.expects(:bunny).returns(bunny) + bunny.expects(:create_channel).returns(channel) ex = @pub.send(:exchange, "some_exchange") assert @pub.send(:exchanges).include?("some_exchange") ex2 = @pub.send(:exchange, "some_exchange") @@ -544,15 +558,16 @@ def setup test "stop should shut down all bunnies" do @pub.servers = ["localhost:1111", "localhost:2222"] s = sequence("shutdown") - bunny = mock("bunny") + bunny1 = mock("bunny1") + bunny2 = mock("bunny2") @pub.expects(:set_current_server).with("localhost:1111").in_sequence(s) @pub.expects(:bunny?).returns(true).in_sequence(s) - @pub.expects(:bunny).returns(bunny).in_sequence(s) - bunny.expects(:stop).in_sequence(s) + @pub.expects(:bunny).returns(bunny1).in_sequence(s) + bunny1.expects(:stop).in_sequence(s) @pub.expects(:set_current_server).with("localhost:2222").in_sequence(s) @pub.expects(:bunny?).returns(true).in_sequence(s) - @pub.expects(:bunny).returns(bunny).in_sequence(s) - bunny.expects(:stop).in_sequence(s) + @pub.expects(:bunny).returns(bunny2).in_sequence(s) + bunny2.expects(:stop).in_sequence(s) @pub.stop end end