From ae3903ee55160be2b33cfba3c2b9444dbf038406 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Sun, 14 Aug 2022 15:08:51 +0200 Subject: [PATCH 01/12] updated bunny to the latest version The tests are green, but we need to study the interplay between the various new timeout options in the multi threaded bunny implementation. This is still WIP!!! --- beetle.gemspec | 2 +- features/support/beetle_handler | 6 +- features/support/system_notification_logger | 8 +-- lib/beetle/configuration.rb | 9 ++- lib/beetle/publisher.rb | 60 ++++++++++------- test/beetle/amqp_gem_behavior_test.rb | 4 +- test/beetle/beetle_test.rb | 2 +- test/beetle/publisher_test.rb | 73 ++++++++++++--------- 8 files changed, 93 insertions(+), 71 deletions(-) diff --git a/beetle.gemspec b/beetle.gemspec index a5518d4e..d62a5ddd 100644 --- a/beetle.gemspec +++ b/beetle.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |s| } s.specification_version = 3 - s.add_runtime_dependency "bunny", "~> 0.7.13" + s.add_runtime_dependency "bunny", "~> 2.19.0" s.add_runtime_dependency "redis", ">= 4.2.1" s.add_runtime_dependency "hiredis", ">= 0.4.5" s.add_runtime_dependency "amq-protocol", "= 2.3.2" diff --git a/features/support/beetle_handler b/features/support/beetle_handler index da4f14d0..7d955ecb 100755 --- a/features/support/beetle_handler +++ b/features/support/beetle_handler @@ -93,17 +93,21 @@ Daemons.run_proc("beetle_handler", :log_output => true, :dir_mode => :normal, :d end config.handler(Beetle.config.beetle_policy_updates_queue_name) do |message| begin - Beetle.config.logger.info "received beetle policy update message': #{message.data}" + Beetle.config.logger.info "Received beetle policy update message': #{message.data}" client.update_queue_properties!(JSON.parse(message.data)) rescue => e Beetle.config.logger.error("#{e}:#{e.backtrace.join("\n")}") end end end + Beetle.config.logger.info "Starting beetle handler for system: #{Beetle.config.system_name}" client.listen do puts "Started beetle handler for system: #{Beetle.config.system_name}" BeetleStatusServer.setup(client) EM.start_server '0.0.0.0', 10254, BeetleStatusServer + + trap("TERM"){ client.stop_listening } end + Beetle.config.logger.info "Terminated beetle handler for system: #{Beetle.config.system_name}" end diff --git a/features/support/system_notification_logger b/features/support/system_notification_logger index 2be60c97..9d5f543f 100755 --- a/features/support/system_notification_logger +++ b/features/support/system_notification_logger @@ -5,7 +5,6 @@ require "daemons" require "eventmachine" require "em-http-server" require "websocket-eventmachine-client" -require File.expand_path("../../lib/beetle", File.dirname(__FILE__)) tmp_path = File.expand_path("../../tmp", File.dirname(__FILE__)) system_notification_log_file_path = "#{tmp_path}/system_notifications.log" @@ -29,11 +28,6 @@ end Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => :normal, :dir => tmp_path) do - Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - - # set Beetle log level to info, less noisy than debug - Beetle.config.logger.level = Logger::DEBUG - log_file = File.open(system_notification_log_file_path, "a+") log_file.sync = true @@ -73,4 +67,6 @@ Daemons.run_proc("system_notification_logger", :log_output => true, :dir_mode => end end + + puts "Terminated system notification logger: #{Process.pid}" end diff --git a/lib/beetle/configuration.rb b/lib/beetle/configuration.rb index 25d61dc1..e43717b7 100644 --- a/lib/beetle/configuration.rb +++ b/lib/beetle/configuration.rb @@ -87,6 +87,8 @@ class Configuration # to 2047, which is the RabbitMQ default in 3.7. We can't set this to 0 because of a bug # in bunny. attr_accessor :channel_max + # the heartbeat setting to be used for RabbitMQ heartbeats (defaults to 0). + attr_accessor :heartbeat # Lazy queues have the advantage of consuming a lot less memory on the broker. For backwards # compatibility, they are disabled by default. @@ -121,8 +123,8 @@ class Configuration # Returns the port on which the Rabbit API is hosted attr_accessor :api_port - # the socket timeout in seconds for message publishing (defaults to 0). - # consider this a highly experimental feature for now. + # The socket timeout in seconds for message publishing (defaults to 15). + # Consider this a highly experimental feature for now. attr_accessor :publishing_timeout # the connect/disconnect timeout in seconds for the publishing connection @@ -185,6 +187,7 @@ def initialize #:nodoc: self.frame_max = 131072 self.channel_max = 2047 self.prefetch_count = 1 + self.heartbeat = 0 self.dead_lettering_enabled = false self.dead_lettering_msg_ttl = 1000 # 1 second @@ -196,7 +199,7 @@ def initialize #:nodoc: self.update_queue_properties_synchronously = false - self.publishing_timeout = 0 + self.publishing_timeout = 15 self.publisher_connect_timeout = 5 # seconds self.tmpdir = "/tmp" diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index 900b5185..7ce8211c 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -9,6 +9,7 @@ def initialize(client, options = {}) #:nodoc: @exchanges_with_bound_queues = {} @dead_servers = {} @bunnies = {} + @channels = {} @throttling_options = {} @next_throttle_refresh = Time.now @throttled = false @@ -27,14 +28,10 @@ def throttling_status @throttled ? 'throttled' : 'unthrottled' end - # list of exceptions potentially raised by bunny - # these need to be lazy, because qrack exceptions are only defined after a connection has been established + # List of exceptions potentially raised by bunny. def bunny_exceptions [ - Bunny::ConnectionError, Bunny::ForcedChannelCloseError, Bunny::ForcedConnectionCloseError, - Bunny::MessageError, Bunny::ProtocolError, Bunny::ServerDownError, Bunny::UnsubscribeError, - Bunny::AcknowledgementError, Qrack::BufferOverflowError, Qrack::InvalidTypeError, - Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error + Bunny::Exception, Errno::EHOSTUNREACH, Errno::ECONNRESET, Errno::ETIMEDOUT, Timeout::Error ] end @@ -46,9 +43,9 @@ def publish(message_name, data, opts={}) #:nodoc: recycle_dead_servers unless @dead_servers.empty? throttle! if opts[:redundant] - publish_with_redundancy(exchange_name, message_name, data, opts) + publish_with_redundancy(exchange_name, message_name, data.to_s, opts) else - publish_with_failover(exchange_name, message_name, data, opts) + publish_with_failover(exchange_name, message_name, data.to_s, opts) end end end @@ -157,26 +154,38 @@ def bunny end def bunny? - @bunnies[@server] + !!@bunnies[@server] end def new_bunny b = Bunny.new( - :host => current_host, - :port => current_port, - :logging => !!@options[:logging], - :user => @client.config.user, - :pass => @client.config.password, - :vhost => @client.config.vhost, - :frame_max => @client.config.frame_max, - :channel_max => @client.config.channel_max, - :socket_timeout => @client.config.publishing_timeout, - :connect_timeout => @client.config.publisher_connect_timeout, - :spec => '09') + :host => current_host, + :port => current_port, + :logger => @client.config.logger, + :username => @client.config.user, + :password => @client.config.password, + :vhost => @client.config.vhost, + :automatically_recover => false, + :frame_max => @client.config.frame_max, + :channel_max => @client.config.channel_max, + :read_timeout => @client.config.publishing_timeout, + :write_timeout => @client.config.publishing_timeout, + :continuation_timeout => @client.config.publishing_timeout, + :connection_timeout => @client.config.publisher_connect_timeout, + :heartbeat => @client.config.heartbeat, + ) b.start b end + def channel + @channels[@server] ||= bunny.create_channel + end + + def channel? + !!@channels[@server] + end + # retry dead servers after ignoring them for 10.seconds # if all servers are dead, retry the one which has been dead for the longest time def recycle_dead_servers @@ -207,7 +216,7 @@ def select_next_server end def create_exchange!(name, opts) - bunny.exchange(name, opts) + channel.exchange(name, opts) end def bind_queues_for_exchange(exchange_name) @@ -218,9 +227,8 @@ def bind_queues_for_exchange(exchange_name) def declare_queue!(queue_name, creation_options) logger.debug("Beetle: creating queue with opts: #{creation_options.inspect}") - queue = bunny.queue(queue_name, creation_options) - - policy_options = bind_dead_letter_queue!(bunny, queue_name, creation_options) + queue = channel.queue(queue_name, creation_options) + policy_options = bind_dead_letter_queue!(channel, queue_name, creation_options) publish_policy_options(policy_options) queue end @@ -235,8 +243,9 @@ def stop!(exception=nil) Beetle::Timer.timeout(timeout) do logger.debug "Beetle: closing connection from publisher to #{server}" if exception - bunny.__send__ :close_socket + bunny.__send__ :close_connection else + channel.close if channel? bunny.stop end end @@ -245,6 +254,7 @@ def stop!(exception=nil) Beetle::reraise_expectation_errors! ensure @bunnies[@server] = nil + @channels[@server] = nil @exchanges[@server] = {} @queues[@server] = {} end diff --git a/test/beetle/amqp_gem_behavior_test.rb b/test/beetle/amqp_gem_behavior_test.rb index 937c978e..fea10271 100644 --- a/test/beetle/amqp_gem_behavior_test.rb +++ b/test/beetle/amqp_gem_behavior_test.rb @@ -11,8 +11,8 @@ class AMQPGemBehaviorTest < Minitest::Test EM::Timer.new(1){ connection.close { EM.stop }} channel = AMQP::Channel.new(connection) channel.on_error { puts "woot"} - exchange = channel.topic("beetle_tests") - queue = AMQP::Queue.new(channel) + exchange = channel.topic("beetle_tests", :durable => false, :auto_delete => true) + queue = AMQP::Queue.new(channel, :durable => false, :auto_delete => true) queue.bind(exchange, :key => "#") queue.subscribe { } queue.subscribe { } diff --git a/test/beetle/beetle_test.rb b/test/beetle/beetle_test.rb index cfe266c1..a5e1bccb 100644 --- a/test/beetle/beetle_test.rb +++ b/test/beetle/beetle_test.rb @@ -2,7 +2,7 @@ module Beetle class HostnameTest < Minitest::Test - test "should use canonical name if possible " do + test "should use canonical name if possible " do addr = mock("addr") addr.expects(:canonname).returns("a.b.com") Socket.expects(:gethostname).returns("a.b.com") diff --git a/test/beetle/publisher_test.rb b/test/beetle/publisher_test.rb index b7a7616e..39591400 100644 --- a/test/beetle/publisher_test.rb +++ b/test/beetle/publisher_test.rb @@ -4,8 +4,8 @@ module Beetle class PublisherTest < Minitest::Test def setup - client = Client.new - @pub = Publisher.new(client) + @client = Client.new + @pub = Publisher.new(@client) end test "acccessing a bunny for a server which doesn't have one should create it and associate it with the server" do @@ -18,16 +18,20 @@ def setup test "new bunnies should be created using current host and port and they should be started" do m = mock("dummy") expected_bunny_options = { - :host => @pub.send(:current_host), :port => @pub.send(:current_port), - :logging => false, - :user => "guest", - :pass => "guest", + :host => @pub.send(:current_host), + :port => @pub.send(:current_port), + :logger => @client.config.logger, + :username => "guest", + :password => "guest", :vhost => "/", - :socket_timeout => 0, - :connect_timeout => 5, + :automatically_recover => false, + :read_timeout => 15, + :write_timeout => 15, + :continuation_timeout => 15, + :connection_timeout => 5, :frame_max => 131072, :channel_max => 2047, - :spec => '09' + :heartbeat => 0, } Bunny.expects(:new).with(expected_bunny_options).returns(m) m.expects(:start) @@ -55,7 +59,7 @@ def setup test "stop!(exception) should close the bunny socket if an exception is not nil" do b = mock("bunny") - b.expects(:close_socket) + b.expects(:close_connection) @pub.expects(:bunny?).returns(true) @pub.expects(:bunny).returns(b) @pub.send(:stop!, Exception.new) @@ -114,7 +118,7 @@ def setup nice_exchange = mock("nice exchange") @pub.stubs(:exchange).with("mama-exchange").returns(raising_exchange).then.returns(raising_exchange).then.returns(nice_exchange) - raising_exchange.expects(:publish).raises(Bunny::ConnectionError).twice + raising_exchange.expects(:publish).raises(Bunny::ConnectionError,'').twice nice_exchange.expects(:publish) @pub.expects(:set_current_server).twice @pub.expects(:stop!).twice @@ -143,9 +147,9 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) e.expects(:publish).in_sequence(redundant) @@ -159,13 +163,13 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) assert_raises Beetle::NoMessageSent do @pub.publish_with_redundancy("mama-exchange", "mama", @data, @opts) @@ -189,7 +193,7 @@ def setup e.expects(:publish).in_sequence(redundant) @pub.expects(:exchange).returns(e).in_sequence(redundant) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(redundant) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(redundant) @pub.expects(:stop!).in_sequence(redundant) @pub.expects(:exchange).returns(e).in_sequence(redundant) @@ -261,13 +265,13 @@ def setup e = mock("exchange") @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) @pub.expects(:exchange).with("mama-exchange").returns(e).in_sequence(failover) - e.expects(:publish).raises(Bunny::ConnectionError).in_sequence(failover) + e.expects(:publish).raises(Bunny::ConnectionError,'').in_sequence(failover) assert_raises Beetle::NoMessageSent do @pub.publish_with_failover("mama-exchange", "mama", @data, @opts) @@ -296,8 +300,10 @@ def setup q = mock("queue") q.expects(:bind).with(:the_exchange, {:key => "haha.#"}) m = mock("Bunny") - m.expects(:queue).with("some_queue", :durable => true, :passive => false, :auto_delete => false, :exclusive => false, :arguments => {"foo" => "fighter"}).returns(q) - @pub.expects(:bunny).returns(m).twice + channel= mock("channel") + m.expects(:create_channel).returns(channel) + channel.expects(:queue).with("some_queue", :durable => true, :passive => false, :auto_delete => false, :exclusive => false, :arguments => {"foo" => "fighter"}).returns(q) + @pub.expects(:bunny).returns(m) @pub.send(:queue, "some_queue") assert_equal q, @pub.send(:queues)["some_queue"] @@ -468,10 +474,12 @@ def setup end test "accessing a given exchange should create it using the config. further access should return the created exchange" do - m = mock("Bunny") - m.expects(:exchange).with("some_exchange", :type => :topic, :durable => true, :queues => []).returns(42) + bunny = mock("bunny") + channel = mock("channel") + channel.expects(:exchange).with("some_exchange", :type => :topic, :durable => true, :queues => []).returns(42) @client.register_exchange("some_exchange", :type => :topic, :durable => true) - @pub.expects(:bunny).returns(m) + @pub.expects(:bunny).returns(bunny) + bunny.expects(:create_channel).returns(channel) ex = @pub.send(:exchange, "some_exchange") assert @pub.send(:exchanges).include?("some_exchange") ex2 = @pub.send(:exchange, "some_exchange") @@ -544,15 +552,16 @@ def setup test "stop should shut down all bunnies" do @pub.servers = ["localhost:1111", "localhost:2222"] s = sequence("shutdown") - bunny = mock("bunny") + bunny1 = mock("bunny1") + bunny2 = mock("bunny2") @pub.expects(:set_current_server).with("localhost:1111").in_sequence(s) @pub.expects(:bunny?).returns(true).in_sequence(s) - @pub.expects(:bunny).returns(bunny).in_sequence(s) - bunny.expects(:stop).in_sequence(s) + @pub.expects(:bunny).returns(bunny1).in_sequence(s) + bunny1.expects(:stop).in_sequence(s) @pub.expects(:set_current_server).with("localhost:2222").in_sequence(s) @pub.expects(:bunny?).returns(true).in_sequence(s) - @pub.expects(:bunny).returns(bunny).in_sequence(s) - bunny.expects(:stop).in_sequence(s) + @pub.expects(:bunny).returns(bunny2).in_sequence(s) + bunny2.expects(:stop).in_sequence(s) @pub.stop end end From 051e0b7494700e312d3058d7ad8294732dae4a45 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 09:25:29 +0200 Subject: [PATCH 02/12] turn on debug logging for the echo RPC --- features/step_definitions/redis_auto_failover_steps.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/features/step_definitions/redis_auto_failover_steps.rb b/features/step_definitions/redis_auto_failover_steps.rb index 6779e0b0..4a9202b8 100644 --- a/features/step_definitions/redis_auto_failover_steps.rb +++ b/features/step_definitions/redis_auto_failover_steps.rb @@ -158,7 +158,7 @@ Then /^the redis master of the beetle handler should be "([^\"]*)"$/ do |redis_name| Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - Beetle.config.logger.level = Logger::INFO + Beetle.config.logger.level = Logger::DEBUG redis_master = TestDaemons::Redis[redis_name].ip_with_port response = `curl -s 127.0.0.1:10254/redis_master`.chomp assert_equal redis_master, response From 3871f065accb99b866f7b6435f0ec2fc65625d77 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 11:06:53 +0200 Subject: [PATCH 03/12] enable auto recovery to see whether this fixes the CI failures --- lib/beetle/publisher.rb | 2 +- test/beetle/publisher_test.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index 7ce8211c..aca6b845 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -165,7 +165,7 @@ def new_bunny :username => @client.config.user, :password => @client.config.password, :vhost => @client.config.vhost, - :automatically_recover => false, + :automatically_recover => true, :frame_max => @client.config.frame_max, :channel_max => @client.config.channel_max, :read_timeout => @client.config.publishing_timeout, diff --git a/test/beetle/publisher_test.rb b/test/beetle/publisher_test.rb index 39591400..7bc9b7a3 100644 --- a/test/beetle/publisher_test.rb +++ b/test/beetle/publisher_test.rb @@ -24,7 +24,7 @@ def setup :username => "guest", :password => "guest", :vhost => "/", - :automatically_recover => false, + :automatically_recover => true, :read_timeout => 15, :write_timeout => 15, :continuation_timeout => 15, From 34e68805326d6669c541d8ed08cd1072500f3701 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 11:39:10 +0200 Subject: [PATCH 04/12] Revert "enable auto recovery to see whether this fixes the CI failures" This reverts commit c7b4b0e080bd02d0db17c6054f96f94b8dabe4ff. --- lib/beetle/publisher.rb | 2 +- test/beetle/publisher_test.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index aca6b845..7ce8211c 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -165,7 +165,7 @@ def new_bunny :username => @client.config.user, :password => @client.config.password, :vhost => @client.config.vhost, - :automatically_recover => true, + :automatically_recover => false, :frame_max => @client.config.frame_max, :channel_max => @client.config.channel_max, :read_timeout => @client.config.publishing_timeout, diff --git a/test/beetle/publisher_test.rb b/test/beetle/publisher_test.rb index 7bc9b7a3..39591400 100644 --- a/test/beetle/publisher_test.rb +++ b/test/beetle/publisher_test.rb @@ -24,7 +24,7 @@ def setup :username => "guest", :password => "guest", :vhost => "/", - :automatically_recover => true, + :automatically_recover => false, :read_timeout => 15, :write_timeout => 15, :continuation_timeout => 15, From ff1704baf99154df59b1a9bfa51d4bf34cc36fcc Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 13:24:03 +0200 Subject: [PATCH 05/12] don't wait when forcing a connection to close --- lib/beetle/publisher.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index 7ce8211c..68542fa2 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -243,7 +243,7 @@ def stop!(exception=nil) Beetle::Timer.timeout(timeout) do logger.debug "Beetle: closing connection from publisher to #{server}" if exception - bunny.__send__ :close_connection + bunny.__send__ :close_connection, false else channel.close if channel? bunny.stop From cf65f867082f6c4a09b5b2cae3664b642c6dcd21 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 13:42:25 +0200 Subject: [PATCH 06/12] try killing the reader loop hard --- features/support/beetle_handler | 5 ++++- lib/beetle/publisher.rb | 2 ++ test/beetle/publisher_test.rb | 8 +++++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/features/support/beetle_handler b/features/support/beetle_handler index 7d955ecb..90449bf2 100755 --- a/features/support/beetle_handler +++ b/features/support/beetle_handler @@ -85,10 +85,13 @@ Daemons.run_proc("beetle_handler", :log_output => true, :dir_mode => :normal, :d config.message(:echo) config.handler(:echo) do |message| begin + Beetle.config.logger.info "Received echo request: reply_to: #{message.header.attributes[:reply_to]}" client.deduplication_store.redis.server rescue master_file_content = File.read(Beetle.config.redis_server) - "no redis master: exception: #{$!.class}(#{$!}), master_file: '#{master_file_content}'" + msg = "no redis master: exception: #{$!.class}(#{$!}), master_file: '#{master_file_content}'" + Beetle.config.logger.error msg + msg end end config.handler(Beetle.config.beetle_policy_updates_queue_name) do |message| diff --git a/lib/beetle/publisher.rb b/lib/beetle/publisher.rb index 68542fa2..6add222c 100644 --- a/lib/beetle/publisher.rb +++ b/lib/beetle/publisher.rb @@ -244,6 +244,8 @@ def stop!(exception=nil) logger.debug "Beetle: closing connection from publisher to #{server}" if exception bunny.__send__ :close_connection, false + reader_loop = bunny.__send__ :reader_loop + reader_loop.kill if reader_loop else channel.close if channel? bunny.stop diff --git a/test/beetle/publisher_test.rb b/test/beetle/publisher_test.rb index 39591400..0bd4b81d 100644 --- a/test/beetle/publisher_test.rb +++ b/test/beetle/publisher_test.rb @@ -55,17 +55,22 @@ def setup assert_equal({}, @pub.send(:exchanges)) assert_equal({}, @pub.send(:queues)) assert_nil @pub.instance_variable_get(:@bunnies)[@pub.server] + assert_nil @pub.instance_variable_get(:@channels)[@pub.server] end test "stop!(exception) should close the bunny socket if an exception is not nil" do b = mock("bunny") + l = mock("loop") b.expects(:close_connection) + b.expects(:reader_loop).returns(l) + l.expects(:kill) @pub.expects(:bunny?).returns(true) - @pub.expects(:bunny).returns(b) + @pub.expects(:bunny).returns(b).twice @pub.send(:stop!, Exception.new) assert_equal({}, @pub.send(:exchanges)) assert_equal({}, @pub.send(:queues)) assert_nil @pub.instance_variable_get(:@bunnies)[@pub.server] + assert_nil @pub.instance_variable_get(:@channels)[@pub.server] end test "stop! should not create a new bunny " do @@ -75,6 +80,7 @@ def setup assert_equal({}, @pub.send(:exchanges)) assert_equal({}, @pub.send(:queues)) assert_nil @pub.instance_variable_get(:@bunnies)[@pub.server] + assert_nil @pub.instance_variable_get(:@channels)[@pub.server] end end From 15926184769647aaf481bac2e80403d20b5066d4 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 13:56:32 +0200 Subject: [PATCH 07/12] try to collect docker logs --- .github/workflows/build.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e91815bd..6789c6c0 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -52,6 +52,12 @@ jobs: env: BUNDLE_GEMFILE: gemfiles/redis_${{ matrix.redis-version }}.gemfile + - name: Collect Docker Logs + uses: jwalton/gh-docker-logs@v2.2.0 + if: failure() + with: + images: 'rabbitmq' + - name: Stop services run: docker-compose down if: always() From 52ae22247c92b599410b368e050a48dfd81014e0 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 15:25:19 +0200 Subject: [PATCH 08/12] learned how to ignore scenarios --- cucumber.yml | 2 +- features/redis_auto_failover.feature | 12 ++++++++++++ features/support/env.rb | 3 +++ 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/cucumber.yml b/cucumber.yml index fea5edc1..16d643e3 100644 --- a/cucumber.yml +++ b/cucumber.yml @@ -1 +1 @@ -default: --publish-quiet +default: --publish-quiet --tags 'not @ignored' diff --git a/features/redis_auto_failover.feature b/features/redis_auto_failover.feature index 40e596b5..a7b044a8 100644 --- a/features/redis_auto_failover.feature +++ b/features/redis_auto_failover.feature @@ -219,3 +219,15 @@ Feature: Redis auto failover And the redis master of "rc-client-2" should be "redis-1" And the redis master of the beetle handler should be "redis-1" And the role of redis server "redis-2" should be "slave" + + @ignored + Scenario: Running the system for a few seconds to perform manual testing + Given a redis configuration server using redis servers "redis-1,redis-2" with clients "rc-client-1,rc-client-2" exists + And a redis configuration client "rc-client-1" using redis servers "redis-1,redis-2" exists + And a redis configuration client "rc-client-2" using redis servers "redis-1,redis-2" exists + And a beetle handler using the redis-master file from "rc-client-1" exists + And the redis master of "rc-client-1" should be "redis-1" + And the redis master of "rc-client-2" should be "redis-1" + And the redis master of the beetle handler should be "redis-1" + And the role of redis server "redis-2" should be "slave" + Then the system can run for a while without dying diff --git a/features/support/env.rb b/features/support/env.rb index 9226ecc1..3a9add53 100644 --- a/features/support/env.rb +++ b/features/support/env.rb @@ -6,11 +6,14 @@ extend Minitest::Assertions end +# Executed before each scenario Before do cleanup_master_files `ruby features/support/system_notification_logger start` + `ruby features/support/beetle_handler start -- --redis-master-file=#{redis_master_file('rc-client-1')}` end +# Executed after each scenario After do cleanup_test_env end From 41d625a11a1070356cb76fc448b54d50c2e15470 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 15:42:01 +0200 Subject: [PATCH 09/12] improved logging --- features/step_definitions/redis_auto_failover_steps.rb | 2 +- lib/beetle.rb | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/features/step_definitions/redis_auto_failover_steps.rb b/features/step_definitions/redis_auto_failover_steps.rb index 4a9202b8..f2a62907 100644 --- a/features/step_definitions/redis_auto_failover_steps.rb +++ b/features/step_definitions/redis_auto_failover_steps.rb @@ -158,7 +158,7 @@ Then /^the redis master of the beetle handler should be "([^\"]*)"$/ do |redis_name| Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - Beetle.config.logger.level = Logger::DEBUG + Beetle.config.logger.level = Logger::WARN redis_master = TestDaemons::Redis[redis_name].ip_with_port response = `curl -s 127.0.0.1:10254/redis_master`.chomp assert_equal redis_master, response diff --git a/lib/beetle.rb b/lib/beetle.rb index 08a3dde9..a48b254e 100644 --- a/lib/beetle.rb +++ b/lib/beetle.rb @@ -28,8 +28,10 @@ class UnknownMessage < Error; end class UnknownQueue < Error; end # raised when no redis master server can be found class NoRedisMaster < Error; end - # raise when no message could be sent by the publisher + # raised when no message could be sent by the publisher class NoMessageSent < Error; end + # logged when an RPC call timed outdated + class RPCTimedOut < Error; end # AMQP options for exchange creation EXCHANGE_CREATION_KEYS = [:auto_delete, :durable, :internal, :nowait, :passive] From 74be343a0ff58744a7354fac5a2fb497c9445014 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Mon, 15 Aug 2022 16:08:48 +0200 Subject: [PATCH 10/12] colored output --- cucumber.yml | 2 +- features/step_definitions/redis_auto_failover_steps.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cucumber.yml b/cucumber.yml index 16d643e3..57a288ac 100644 --- a/cucumber.yml +++ b/cucumber.yml @@ -1 +1 @@ -default: --publish-quiet --tags 'not @ignored' +default: --publish-quiet --tags 'not @ignored' --color diff --git a/features/step_definitions/redis_auto_failover_steps.rb b/features/step_definitions/redis_auto_failover_steps.rb index f2a62907..f02c865b 100644 --- a/features/step_definitions/redis_auto_failover_steps.rb +++ b/features/step_definitions/redis_auto_failover_steps.rb @@ -158,7 +158,7 @@ Then /^the redis master of the beetle handler should be "([^\"]*)"$/ do |redis_name| Beetle.config.servers = "127.0.0.1:5672" # rabbitmq - Beetle.config.logger.level = Logger::WARN + Beetle.config.logger.level = Logger::FATAL redis_master = TestDaemons::Redis[redis_name].ip_with_port response = `curl -s 127.0.0.1:10254/redis_master`.chomp assert_equal redis_master, response From f2f6ba583070a3d27b2a96af1fb49b2416c3c334 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Thu, 22 Feb 2024 14:55:20 +0100 Subject: [PATCH 11/12] removed require that doesn't work anymore --- lib/beetle.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/beetle.rb b/lib/beetle.rb index a48b254e..e29e1a90 100644 --- a/lib/beetle.rb +++ b/lib/beetle.rb @@ -1,6 +1,5 @@ $:.unshift(File.expand_path('..', __FILE__)) require 'bunny' # which bunny picks up -require 'qrack/errors' # needed by the publisher begin require 'redis/connection/hiredis' # require *before* redis as specified in the redis-rb gem docs From bbfab594c00140199f064508cb347c04ed7c2185 Mon Sep 17 00:00:00 2001 From: Stefan Kaes Date: Thu, 22 Feb 2024 17:40:36 +0100 Subject: [PATCH 12/12] upgraded bunny --- beetle.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beetle.gemspec b/beetle.gemspec index d62a5ddd..10b0d35c 100644 --- a/beetle.gemspec +++ b/beetle.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |s| } s.specification_version = 3 - s.add_runtime_dependency "bunny", "~> 2.19.0" + s.add_runtime_dependency "bunny", "~> 2.22.0" s.add_runtime_dependency "redis", ">= 4.2.1" s.add_runtime_dependency "hiredis", ">= 0.4.5" s.add_runtime_dependency "amq-protocol", "= 2.3.2"