diff --git a/lib/synapse.rb b/lib/synapse.rb index b35cefb8..8a8f105d 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -40,6 +40,9 @@ def initialize(opts={}) # configuration is initially enabled to configure on first loop @config_updated = AtomicValue.new(true) + executor = Concurrent::ThreadPoolExecutor.new(:min_threads => 1, :max_threads => [2, @service_watchers.length / 4].max) + @task_scheduler = Concurrent::TimerSet.new(:executor => executor) + # Any exceptions in the watcher threads should wake the main thread so # that we can fail fast. Thread.abort_on_exception = true @@ -59,7 +62,7 @@ def run statsd_time('synapse.watchers.start.time') do @service_watchers.map do |watcher| begin - watcher.start + watcher.start(@task_scheduler) statsd_increment("synapse.watcher.start", ['start_result:success', "watcher_name:#{watcher.name}"]) rescue Exception => e statsd_increment("synapse.watcher.start", ['start_result:fail', "watcher_name:#{watcher.name}", "exception_name:#{e.class.name}", "exception_message:#{e.message}"]) @@ -119,6 +122,8 @@ def run raise e end end + + @task_scheduler.kill statsd_increment('synapse.stop', ['stop_avenue:clean', 'stop_location:main_loop']) end diff --git a/lib/synapse/service_watcher/README.md b/lib/synapse/service_watcher/README.md index bf138f4f..6686cb4d 100644 --- a/lib/synapse/service_watcher/README.md +++ b/lib/synapse/service_watcher/README.md @@ -14,8 +14,11 @@ require "synapse/service_watcher/base/base" class Synapse::ServiceWatcher class MyWatcher < BaseWatcher - def start - # write code which begins running service discovery + def start(scheduler) + # write code which begins running service discovery. + # Instead of running a background thread, you can use scheduler + # which is an instance of Concurrent::TimerSet to schedule tasks on a thread pool. + # (http://ruby-concurrency.github.io/concurrent-ruby/1.1.5/Concurrent/TimerSet.html) end def stop diff --git a/lib/synapse/service_watcher/base/base.rb b/lib/synapse/service_watcher/base/base.rb index 0a712ec8..3e380914 100644 --- a/lib/synapse/service_watcher/base/base.rb +++ b/lib/synapse/service_watcher/base/base.rb @@ -94,7 +94,7 @@ def haproxy end # this should be overridden to actually start your watcher - def start + def start(scheduler) log.info "synapse: starting stub watcher; this means doing nothing at all!" end diff --git a/lib/synapse/service_watcher/base/poll.rb b/lib/synapse/service_watcher/base/poll.rb new file mode 100644 index 00000000..3042bccd --- /dev/null +++ b/lib/synapse/service_watcher/base/poll.rb @@ -0,0 +1,44 @@ +require 'synapse/service_watcher/base/base' + +require 'concurrent' + +class Synapse::ServiceWatcher + class PollWatcher < BaseWatcher + def initialize(opts={}, synapse, reconfigure_callback) + super(opts, synapse, reconfigure_callback) + + @check_interval = @discovery['check_interval'] || 15.0 + @should_exit = Concurrent::AtomicBoolean.new(false) + end + + def start(scheduler) + reset_schedule = Proc.new { + discover + + # Schedule the next task until we should exit + unless @should_exit.true? + scheduler.post(@check_interval, &reset_schedule) + end + } + + # Execute the first discover immediately + scheduler.post(0, &reset_schedule) + end + + def stop + @should_exit.make_true + end + + private + def validate_discovery_opts + raise ArgumentError, "invalid discovery method '#{@discovery['method']}' for poll watcher" \ + unless @discovery['method'] == 'poll' + + log.warn "synapse: warning: a stub watcher with no default servers is pretty useless" if @default_servers.empty? + end + + def discover + log.info "base poll watcher discover" + end + end +end diff --git a/lib/synapse/service_watcher/dns/dns.rb b/lib/synapse/service_watcher/dns/dns.rb index f8ec70d7..7a21e13a 100644 --- a/lib/synapse/service_watcher/dns/dns.rb +++ b/lib/synapse/service_watcher/dns/dns.rb @@ -1,21 +1,18 @@ -require "synapse/service_watcher/base/base" +require "synapse/service_watcher/base/poll" -require 'thread' require 'resolv' class Synapse::ServiceWatcher - class DnsWatcher < BaseWatcher - def start - @check_interval = @discovery['check_interval'] || 30.0 - @nameserver = @discovery['nameserver'] + class DnsWatcher < PollWatcher + def initialize(opts={}, synapse, reconfigure_callback) + super(opts, synapse, reconfigure_callback) - @watcher = Thread.new do - watch - end + @nameserver = @discovery['nameserver'] + @check_interval = @discovery['check_interval'] || 30.0 end def ping? - @watcher.alive? && !(resolver.getaddresses('airbnb.com').empty?) + !(resolver.getaddresses('airbnb.com').empty?) end def discovery_servers @@ -30,33 +27,8 @@ def validate_discovery_opts if discovery_servers.empty? end - def watch - last_resolution = resolve_servers - configure_backends(last_resolution) - until @should_exit - begin - start = Time.now - current_resolution = resolve_servers - unless last_resolution == current_resolution - last_resolution = current_resolution - configure_backends(last_resolution) - end - - sleep_until_next_check(start) - rescue => e - log.warn "Error in watcher thread: #{e.inspect}" - log.warn e.backtrace - end - end - - log.info "synapse: dns watcher exited successfully" - end - - def sleep_until_next_check(start_time) - sleep_time = @check_interval - (Time.now - start_time) - if sleep_time > 0.0 - sleep(sleep_time) - end + def discover + configure_backends(resolve_servers) end IP_REGEX = Regexp.union([Resolv::IPv4::Regex, Resolv::IPv6::Regex]) diff --git a/lib/synapse/service_watcher/docker/docker.rb b/lib/synapse/service_watcher/docker/docker.rb index a34a5ab1..0a5a6ada 100644 --- a/lib/synapse/service_watcher/docker/docker.rb +++ b/lib/synapse/service_watcher/docker/docker.rb @@ -1,15 +1,8 @@ -require "synapse/service_watcher/base/base" +require "synapse/service_watcher/base/poll" require 'docker' class Synapse::ServiceWatcher - class DockerWatcher < BaseWatcher - def start - @check_interval = @discovery['check_interval'] || 15.0 - @watcher = Thread.new do - watch - end - end - + class DockerWatcher < PollWatcher private def validate_discovery_opts raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ @@ -22,26 +15,8 @@ def validate_discovery_opts if @discovery['container_port'].nil? end - def watch - until @should_exit - begin - start = Time.now - set_backends(containers) - sleep_until_next_check(start) - rescue Exception => e - log.warn "synapse: error in watcher thread: #{e.inspect}" - log.warn e.backtrace - end - end - - log.info "synapse: docker watcher exited successfully" - end - - def sleep_until_next_check(start_time) - sleep_time = @check_interval - (Time.now - start_time) - if sleep_time > 0.0 - sleep(sleep_time) - end + def discover + set_backends(containers) end def rewrite_container_ports(ports) diff --git a/lib/synapse/service_watcher/ec2tag/ec2tag.rb b/lib/synapse/service_watcher/ec2tag/ec2tag.rb index 15889f6b..31ec6fd9 100644 --- a/lib/synapse/service_watcher/ec2tag/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag/ec2tag.rb @@ -1,12 +1,11 @@ -require 'synapse/service_watcher/base/base' +require 'synapse/service_watcher/base/poll' require 'aws-sdk' class Synapse::ServiceWatcher - class Ec2tagWatcher < BaseWatcher - + class Ec2tagWatcher < PollWatcher attr_reader :check_interval - def start + def start(scheduler) region = @discovery['aws_region'] || ENV['AWS_REGION'] log.info "Connecting to EC2 region: #{region}" @@ -15,16 +14,13 @@ def start access_key_id: @discovery['aws_access_key_id'] || ENV['AWS_ACCESS_KEY_ID'], secret_access_key: @discovery['aws_secret_access_key'] || ENV['AWS_SECRET_ACCESS_KEY'] ) - @check_interval = @discovery['check_interval'] || 15.0 - log.info "synapse: ec2tag watcher looking for instances " + - "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" + "tagged with #{@discovery['tag_name']}=#{@discovery['tag_value']}" - @watcher = Thread.new { watch } + super(scheduler) end private - def validate_discovery_opts # Required, via options only. raise ArgumentError, "invalid discovery method #{@discovery['method']}" \ @@ -52,28 +48,9 @@ def validate_discovery_opts end end - def watch - until @should_exit - begin - start = Time.now - if set_backends(discover_instances) - log.info "synapse: ec2tag watcher backends have changed." - end - rescue Exception => e - log.warn "synapse: error in ec2tag watcher thread: #{e.inspect}" - log.warn e.backtrace - ensure - sleep_until_next_check(start) - end - end - - log.info "synapse: ec2tag watcher exited successfully" - end - - def sleep_until_next_check(start_time) - sleep_time = check_interval - (Time.now - start_time) - if sleep_time > 0.0 - sleep(sleep_time) + def discover + if set_backends(discover_instances) + log.info "synapse: ec2tag watcher backends have changed." end end diff --git a/lib/synapse/service_watcher/marathon/marathon.rb b/lib/synapse/service_watcher/marathon/marathon.rb index 47565485..06131589 100644 --- a/lib/synapse/service_watcher/marathon/marathon.rb +++ b/lib/synapse/service_watcher/marathon/marathon.rb @@ -5,7 +5,7 @@ class Synapse::ServiceWatcher class MarathonWatcher < BaseWatcher - def start + def start(scheduler) @check_interval = @discovery['check_interval'] || 10.0 @connection = nil @watcher = Thread.new { sleep splay; watch } diff --git a/lib/synapse/service_watcher/multi/multi.rb b/lib/synapse/service_watcher/multi/multi.rb index 54ab7c98..1fc7a71e 100644 --- a/lib/synapse/service_watcher/multi/multi.rb +++ b/lib/synapse/service_watcher/multi/multi.rb @@ -58,12 +58,12 @@ def initialize(opts={}, synapse, reconfigure_callback) -> { resolver_notification }) end - def start + def start(scheduler) log.info "synapse: starting multi watcher" statsd_increment("synapse.watcher.multi.start") @watchers.values.each do |w| - w.start + w.start(scheduler) end @resolver.start diff --git a/lib/synapse/service_watcher/zookeeper/zookeeper.rb b/lib/synapse/service_watcher/zookeeper/zookeeper.rb index 67aa40c1..68d83d0d 100644 --- a/lib/synapse/service_watcher/zookeeper/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper/zookeeper.rb @@ -1,11 +1,10 @@ require "synapse/service_watcher/base/base" -require 'synapse/atomic' -require 'thread' require 'zk' require 'zookeeper' require 'base64' require 'objspace' +require 'concurrent' class Synapse::ServiceWatcher class ZookeeperWatcher < BaseWatcher @@ -25,6 +24,7 @@ class ZookeeperWatcher < BaseWatcher @@zk_pool = {} @@zk_pool_count = {} + @@zk_should_exit = Concurrent::AtomicBoolean.new(false) @@zk_pool_lock = Mutex.new def initialize(opts={}, synapse, reconfigure_callback) @@ -56,22 +56,15 @@ def initialize(opts={}, synapse, reconfigure_callback) @zk = nil @watcher = nil - @thread = nil - @should_exit = Synapse::AtomicValue.new(false) end - def start + def start(scheduler) log.info "synapse: starting ZK watcher #{@name} @ cluster: #{@zk_cluster} path: #{@discovery['path']} retry policy: #{@retry_policy}" - # Zookeeper processing is run in a background thread so that any retries - # do not block the main thread. zk_connect do - @thread = Thread.new { + # Asynchronously start the discovery. + scheduler.post(0) { start_discovery - - until @should_exit.get - sleep 0.5 - end } end end @@ -79,8 +72,6 @@ def start def stop log.warn "synapse: zookeeper watcher exiting" - @should_exit.set(true) - zk_teardown do @watcher.unsubscribe unless @watcher.nil? @watcher = nil @@ -88,11 +79,13 @@ def stop end def ping? + stop if @@zk_should_exit.true? + # @zk being nil implies no session *or* a lost session, do not remove # the check on @zk being truthy # if the client is in any of the three states: associating, connecting, connected # we consider it alive. this can avoid synapse restart on short network dis-connection - @zk && (@zk.associating? || @zk.connecting? || @zk.connected?) + !@zk.nil? && (@zk.associating? || @zk.connecting? || @zk.connected?) end def watching? @@ -391,9 +384,21 @@ def zk_connect(&bootstrap) # https://github.com/zk-ruby/zookeeper/blob/80a88e3179fd1d526f7e62a364ab5760f5f5da12/ext/zkrb.c @@zk_pool[@zk_hosts] = with_retry(@retry_policy.merge({'retriable_errors' => RuntimeError})) do |attempts| log.info "synapse: creating pooled connection to #{@zk_hosts} for #{attempts} times" - # zk session timeout is 2 * receive_timeout_msec (as of zookeeper-1.4.x) + # zk session timeout is 2 * receive_timeout_msec (as of zookeeper-1.4.x) # i.e. 18000 means 36 sec - ZK.new(@zk_hosts, :timeout => 5, :receive_timeout_msec => 18000, :thread => :per_callback) + zk = ZK.new(@zk_hosts, :timeout => 5, :receive_timeout_msec => 18000, :thread => :per_callback) + + # handle session expiry -- mark that all watchers should shutdown now. + # since this eventually causes Synapse to shutdown, we do not scope the + # flag to a single client (or watcher). + zk.on_expired_session do + statsd_increment('synapse.watcher.zk.session.expired', ["zk_cluster:#{@zk_cluster}", "service_name:#{@name}"]) + log.warn "synapse: ZK client session expired #{@name}" + + @@zk_should_exit.make_true + end + + zk end @@zk_pool_count[@zk_hosts] = 1 log.info "synapse: successfully created zk connection to #{@zk_hosts}" @@ -408,14 +413,6 @@ def zk_connect(&bootstrap) @zk = @@zk_pool[@zk_hosts] log.info "synapse: retrieved zk connection to #{@zk_hosts}" - # handle session expiry -- by cleaning up zk, this will make `ping?` - # fail and so synapse will exit - @zk.on_expired_session do - statsd_increment('synapse.watcher.zk.session.expired', ["zk_cluster:#{@zk_cluster}", "service_name:#{@name}"]) - log.warn "synapse: ZK client session expired #{@name}" - stop - end - bootstrap.call end end diff --git a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb index d4fdafd3..19a38f6d 100644 --- a/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb +++ b/lib/synapse/service_watcher/zookeeper_dns/zookeeper_dns.rb @@ -1,4 +1,4 @@ -require 'synapse/service_watcher/base/base' +require 'synapse/service_watcher/base/poll' require 'synapse/service_watcher/dns/dns' require 'synapse/service_watcher/zookeeper/zookeeper' @@ -20,7 +20,7 @@ # has passed (triggering a re-resolve), or that the watcher should shut down. # The DNS watcher is responsible for the actual reconfiguring of backends. class Synapse::ServiceWatcher - class ZookeeperDnsWatcher < BaseWatcher + class ZookeeperDnsWatcher < PollWatcher # Valid messages that can be passed through the internal message queue module Messages @@ -36,13 +36,8 @@ class NewServers < Struct.new(:servers); end # refresh of the IP addresses. class CheckInterval; end - # Indicates that the DNS watcher should shut down. This is sent when - # stop is called. - class StopWatcher; end - # Saved instances of message types with contents that cannot vary. This # reduces object allocation. - STOP_WATCHER_MESSAGE = StopWatcher.new CHECK_INTERVAL_MESSAGE = CheckInterval.new end @@ -58,49 +53,40 @@ def initialize(opts={}, parent=nil, synapse, reconfigure_callback, message_queue super(opts, synapse, reconfigure_callback) end - def stop - @message_queue.push(Messages::STOP_WATCHER_MESSAGE) - end + def discover + # The message will be to check a new set of servers from ZK, or to re-resolve + # the DNS (triggered every check_interval seconds) + begin + message = @message_queue.pop(false) + rescue ThreadError + # no item from the queue + return + end - def watch - last_resolution = nil - while true - # Blocks on message queue, the message will be a signal to stop - # watching, to check a new set of servers from ZK, or to re-resolve - # the DNS (triggered every check_interval seconds) - message = @message_queue.pop - - log.debug "synapse: received message #{message.inspect}" - - case message - when Messages::StopWatcher - break - when Messages::NewServers - self.discovery_servers = message.servers - when Messages::CheckInterval - # Proceed to re-resolve the DNS - else - raise Messages::InvalidMessageError, - "Received unrecognized message: #{message.inspect}" - end + log.debug "synapse: received message #{message.inspect}" - # Empty servers means we haven't heard back from ZK yet or ZK is - # empty. This should only occur if we don't get results from ZK - # within check_interval seconds or if ZK is empty. - if self.discovery_servers.nil? || self.discovery_servers.empty? - log.warn "synapse: no backends for service #{@name}" - else - # Resolve DNS names with the nameserver - current_resolution = resolve_servers - unless last_resolution == current_resolution - last_resolution = current_resolution - configure_backends(last_resolution) - - # Propagate revision updates down to ZookeeperDnsWatcher, so - # that stanza cache can work properly. - @revision += 1 - @parent.reconfigure! unless @parent.nil? - end + case message + when Messages::NewServers + self.discovery_servers = message.servers + when Messages::CheckInterval + # Proceed to re-resolve the DNS + else + raise Messages::InvalidMessageError, + "Received unrecognized message: #{message.inspect}" + end + + # Empty servers means we haven't heard back from ZK yet or ZK is + # empty. This should only occur if we don't get results from ZK + # within check_interval seconds or if ZK is empty. + if self.discovery_servers.nil? || self.discovery_servers.empty? + log.warn "synapse: no backends for service #{@name}" + else + # Resolve DNS names with the nameserver + if configure_backends(resolve_servers) + # Propagate revision updates down to ZookeeperDnsWatcher, so + # that stanza cache can work properly. + @revision += 1 + @parent.reconfigure! unless @parent.nil? end end end @@ -112,34 +98,21 @@ def validate_discovery_opts end end - def start + def start(scheduler) @check_interval = @discovery['check_interval'] || 30.0 @message_queue = Queue.new @dns = make_dns_watcher(@message_queue) @zk = make_zookeeper_watcher(@message_queue) - @zk.start - @dns.start - - @watcher = Thread.new do - until @should_exit - # Trigger a DNS resolve every @check_interval seconds - sleep @check_interval + @zk.start(scheduler) + @dns.start(scheduler) - # Only trigger the resolve if the queue is empty, every other message - # on the queue would either cause a resolve or stop the watcher - if @message_queue.empty? - @message_queue.push(Messages::CHECK_INTERVAL_MESSAGE) - end - - end - log.info "synapse: zookeeper_dns watcher exited successfully" - end + super(scheduler) end def ping? - @watcher.alive? && @dns.ping? && @zk.ping? + @dns.ping? && @zk.ping? end def stop @@ -162,6 +135,12 @@ def reconfigure! private + def discover + if @message_queue.empty? + @message_queue.push(Messages::CHECK_INTERVAL_MESSAGE) + end + end + def make_dns_watcher(queue) dns_discovery_opts = @discovery.select do |k,_| k == 'nameserver' || k == 'label_filter' diff --git a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb index 40e2f6e3..182a511e 100644 --- a/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb +++ b/lib/synapse/service_watcher/zookeeper_poll/zookeeper_poll.rb @@ -1,9 +1,7 @@ require 'synapse/service_watcher/base/base' require 'synapse/service_watcher/zookeeper/zookeeper' -require 'synapse/atomic' -require 'zk' -require 'thread' +require 'concurrent' class Synapse::ServiceWatcher class ZookeeperPollWatcher < ZookeeperWatcher @@ -11,35 +9,22 @@ def initialize(opts, synapse, reconfigure_callback) super(opts, synapse, reconfigure_callback) @poll_interval = @discovery['polling_interval_sec'] || 60 - - @should_exit = Synapse::AtomicValue.new(false) - @thread = nil + @should_exit = Concurrent::AtomicBoolean.new(false) end - def start + def start(scheduler) log.info 'synapse: ZookeeperPollWatcher starting' zk_connect do - @thread = Thread.new { - log.info 'synapse: zookeeper polling thread started' - - # Ensure we poll on first start. - last_run = Time.now - @poll_interval - 1 - - until @should_exit.get - now = Time.now - elapsed = now - last_run + reset_schedule = Proc.new { + discover - if elapsed >= @poll_interval - last_run = now - discover - end - - sleep 0.5 + unless @should_exit.true? + scheduler.post(@poll_interval, &reset_schedule) end - - log.info 'synapse: zookeeper polling thread exiting normally' } + + scheduler.post(0, &reset_schedule) end end @@ -47,9 +32,8 @@ def stop log.warn 'synapse: ZookeeperPollWatcher stopping' zk_teardown do - # Signal to the thread that it should exit, and then wait for it to - # exit. - @should_exit.set(true) + # Signal to the process that it should not reset. + @should_exit.make_true end end diff --git a/lib/synapse/version.rb b/lib/synapse/version.rb index 9f12066a..24c1becb 100644 --- a/lib/synapse/version.rb +++ b/lib/synapse/version.rb @@ -1,3 +1,3 @@ module Synapse - VERSION = "0.18.4" + VERSION = "0.18.5" end diff --git a/spec/lib/synapse/service_watcher_docker_spec.rb b/spec/lib/synapse/service_watcher_docker_spec.rb index 5f99ef1b..b65099ad 100644 --- a/spec/lib/synapse/service_watcher_docker_spec.rb +++ b/spec/lib/synapse/service_watcher_docker_spec.rb @@ -15,8 +15,15 @@ class Synapse::ServiceWatcher::DockerWatcher }) mock_synapse end + + let(:mock_scheduler) do + Concurrent::TimerSet.new(:executor => :immediate) + end + subject { Synapse::ServiceWatcher::DockerWatcher.new(testargs, mocksynapse, -> {}) } + let(:testargs) { { 'name' => 'foo', 'discovery' => { 'method' => 'docker', 'servers' => [{'host' => 'server1.local', 'name' => 'mainserver'}], 'image_name' => 'mycool/image', 'container_port' => 6379 }, 'haproxy' => {} }} + before(:each) do allow(subject.log).to receive(:warn) allow(subject.log).to receive(:info) @@ -34,37 +41,20 @@ def add_arg(name, value) context "normal tests" do it('starts a watcher thread') do - watcher_mock = double() - expect(Thread).to receive(:new).and_return(watcher_mock) - subject.start - expect(subject.watcher).to equal(watcher_mock) + subject.start(mock_scheduler) end + it('sets default check interval') do - expect(Thread).to receive(:new).and_return(double) - subject.start + subject.start(mock_scheduler) expect(subject.check_interval).to eq(15.0) end end - context "watch tests" do - before(:each) do - expect(subject).to receive(:sleep_until_next_check) do |arg| - subject.instance_variable_set('@should_exit', true) - end - end + context "discover tests" do it('has a happy first run path, configuring backends') do expect(subject).to receive(:containers).and_return(['container1']) expect(subject).to receive(:set_backends).with(['container1']) - subject.send(:watch) - end - end - context "watch eats exceptions" do - it "blows up when finding containers" do - expect(subject).to receive(:containers) do |arg| - subject.instance_variable_set('@should_exit', true) - raise('throw exception inside watch') - end - expect { subject.send(:watch) }.not_to raise_error + subject.send(:discover) end end diff --git a/spec/lib/synapse/service_watcher_ec2tags_spec.rb b/spec/lib/synapse/service_watcher_ec2tags_spec.rb index b7faf157..9468a1e2 100644 --- a/spec/lib/synapse/service_watcher_ec2tags_spec.rb +++ b/spec/lib/synapse/service_watcher_ec2tags_spec.rb @@ -150,23 +150,20 @@ def munge_arg(name, new_value) let(:instance1) { FakeAWSInstance.new } let(:instance2) { FakeAWSInstance.new } - context 'watch' do - - it 'discovers instances, configures backends, then sleeps' do + context 'discover' do + it 'discovers instances and configures backends' do fake_backends = [1,2,3] expect(subject).to receive(:discover_instances).and_return(fake_backends) expect(subject).to receive(:set_backends).with(fake_backends) { subject.stop } - expect(subject).to receive(:sleep_until_next_check) - subject.send(:watch) + subject.send(:discover) end - it 'sleeps until next check if discover_instances fails' do + it 'throws error if discover_instances fails' do expect(subject).to receive(:discover_instances) do subject.stop raise "discover failed" end - expect(subject).to receive(:sleep_until_next_check) - subject.send(:watch) + expect { subject.send(:discover) }.to raise_error end end diff --git a/spec/lib/synapse/service_watcher_marathon_spec.rb b/spec/lib/synapse/service_watcher_marathon_spec.rb index 485225b6..e05979a7 100644 --- a/spec/lib/synapse/service_watcher_marathon_spec.rb +++ b/spec/lib/synapse/service_watcher_marathon_spec.rb @@ -10,6 +10,11 @@ }) mock_synapse end + + let(:mock_scheduler) do + Concurrent::TimerSet.new(:executor => :immediate) + end + let(:marathon_host) { '127.0.0.1' } let(:marathon_port) { '8080' } let(:app_name) { 'foo' } @@ -60,12 +65,12 @@ end it 'does not crash' do - expect { subject.start }.not_to raise_error + expect { subject.start(mock_scheduler) }.not_to raise_error end end it 'requests the proper API endpoint one time' do - subject.start + subject.start(mock_scheduler) expect(a_request(:get, marathon_request_uri)).to have_been_made.times(1) end @@ -79,7 +84,7 @@ let(:marathon_request_uri) { "#{marathon_host}:#{marathon_port}/v3/tasks/#{app_name}" } it 'calls the customized path' do - subject.start + subject.start(mock_scheduler) expect(a_request(:get, marathon_request_uri)).to have_been_made.times(1) end end @@ -110,7 +115,7 @@ it 'adds the task as a backend' do expect(subject).to receive(:set_backends).with([expected_backend_hash]) - subject.start + subject.start(mock_scheduler) end context 'with a custom port_index' do @@ -128,7 +133,7 @@ it 'adds the task as a backend' do expect(subject).to receive(:set_backends).with([expected_backend_hash]) - subject.start + subject.start(mock_scheduler) end context 'when that port_index does not exist' do @@ -138,7 +143,7 @@ it 'does not include the backend' do expect(subject).to receive(:set_backends).with([]) - subject.start + subject.start(mock_scheduler) end end end @@ -162,14 +167,14 @@ it 'filters tasks that have no startedAt value' do expect(subject).to receive(:set_backends).with([expected_backend_hash]) - subject.start + subject.start(mock_scheduler) end end context 'when marathon returns invalid response' do let(:marathon_response) { [] } it 'does not blow up' do - expect { subject.start }.to_not raise_error + expect { subject.start(mock_scheduler) }.to_not raise_error end end @@ -189,7 +194,7 @@ it 'only sleeps for the difference' do expect(subject).to receive(:sleep).with(check_interval - job_duration) - subject.start + subject.start(mock_scheduler) end end end diff --git a/spec/lib/synapse/service_watcher_multi_spec.rb b/spec/lib/synapse/service_watcher_multi_spec.rb index 7ef1c341..9a3e2a83 100644 --- a/spec/lib/synapse/service_watcher_multi_spec.rb +++ b/spec/lib/synapse/service_watcher_multi_spec.rb @@ -14,6 +14,10 @@ mock_synapse end + let(:mock_scheduler) do + Concurrent::TimerSet.new(:executor => :immediate) + end + subject { Synapse::ServiceWatcher::MultiWatcher.new(config, mock_synapse, reconfigure_callback) } @@ -222,7 +226,7 @@ expect(w).to receive(:start) end - expect { subject.start }.not_to raise_error + expect { subject.start(mock_scheduler) }.not_to raise_error end it 'starts resolver' do @@ -233,7 +237,7 @@ end expect(resolver).to receive(:start) - expect { subject.start }.not_to raise_error + expect { subject.start(mock_scheduler) }.not_to raise_error end end diff --git a/spec/lib/synapse/service_watcher_poll_spec.rb b/spec/lib/synapse/service_watcher_poll_spec.rb new file mode 100644 index 00000000..1e9d10d6 --- /dev/null +++ b/spec/lib/synapse/service_watcher_poll_spec.rb @@ -0,0 +1,81 @@ +require 'spec_helper' + +require 'synapse/service_watcher/base/poll' +require 'concurrent' + +describe Synapse::ServiceWatcher::PollWatcher do + let(:mock_synapse) do + mock_synapse = instance_double(Synapse::Synapse) + mockgenerator = Synapse::ConfigGenerator::BaseGenerator.new() + allow(mock_synapse).to receive(:available_generators).and_return({ + 'haproxy' => mockgenerator + }) + mock_synapse + end + + let(:mock_scheduler) do + Concurrent::ImmediateExecutor.new + end + + let(:config) do + { + 'name' => 'test', + 'haproxy' => {}, + 'discovery' => discovery, + } + end + + let(:discovery) { { 'method' => 'poll' } } + + subject { Synapse::ServiceWatcher::PollWatcher.new(config, mock_synapse, -> {}) } + + describe '#initialize' do + it 'has a default check interval' do + expect(subject.instance_variable_get(:@check_interval)).to eq(15) + end + end + + describe '#start' do + it 'schedules a recurring task' do + expect(mock_scheduler).to receive(:post).exactly(:once).with(0).and_call_original + expect(mock_scheduler).to receive(:post).exactly(:once).with(15) + expect(subject).to receive(:discover).exactly(:once) + + subject.start(mock_scheduler) + end + + context 'when stopped' do + before :each do + subject.stop + end + + it 'does not reschedule' do + expect(mock_scheduler).to receive(:post).exactly(:once).with(0).and_call_original + expect(mock_scheduler).not_to receive(:post).with(15) + expect(subject).to receive(:discover).exactly(:once) + + subject.start(mock_scheduler) + end + end + + context 'with check_interval=0' do + let(:discovery) { { 'method' => 'poll', 'check_interval' => 0 } } + + it 'keeps calling discover until stop is called' do + count = 0 + expect(mock_scheduler).to receive(:post).with(0).exactly(15).times.and_wrap_original { |m, *args, &block| + count += 1 + subject.stop if count >= 15 + + m.call(*args) { + block.call + } + } + + expect(subject).to receive(:discover).exactly(15).times + + subject.start(mock_scheduler) + end + end + end +end diff --git a/spec/lib/synapse/service_watcher_zookeeper_spec.rb b/spec/lib/synapse/service_watcher_zookeeper_spec.rb index c90d09ca..ebfae1b2 100644 --- a/spec/lib/synapse/service_watcher_zookeeper_spec.rb +++ b/spec/lib/synapse/service_watcher_zookeeper_spec.rb @@ -16,6 +16,11 @@ }) mock_synapse end + + let(:mock_scheduler) do + Concurrent::TimerSet.new(:executor => :immediate) + end + let(:config) do { 'name' => 'test', @@ -233,6 +238,16 @@ expect(ZK).to receive(:new).exactly(2).and_raise(RuntimeError) expect { subject.send(:zk_connect) }.to raise_error(RuntimeError) end + + it 'only sets one callback for expired session' do + allow(ZK).to receive(:new).exactly(:once).and_return(mock_zk) + expect(mock_zk).to receive(:on_expired_session).exactly(:once) + + x = Synapse::ServiceWatcher::ZookeeperWatcher.new(config, mock_synapse, ->(*args) {}) + y = Synapse::ServiceWatcher::ZookeeperWatcher.new(config, mock_synapse, ->(*args) {}) + x.start(mock_scheduler) + y.start(mock_scheduler) + end end describe 'start_discovery' do @@ -266,7 +281,7 @@ it 'calls zk_connect' do expect(subject).to receive(:zk_connect).exactly(:once) - subject.start + subject.start(mock_scheduler) end end @@ -449,6 +464,28 @@ end end + describe "#ping" do + before :each do + Synapse::ServiceWatcher::ZookeeperWatcher.class_variable_set(:@@zk_pool, {}) + end + + context 'after on_expired_session' do + it 'calls stop' do + allow(ZK).to receive(:new).and_return(mock_zk) + allow(mock_zk).to receive(:connecting?).and_return(true) + allow(mock_zk).to receive(:close!) + + expect(mock_zk).to receive(:on_expired_session) { |*args, &block| + block.call + } + expect(subject).to receive(:stop).exactly(:once).and_call_original + + expect { subject.start(mock_scheduler) }.not_to raise_error + expect(subject.ping?).to eq(false) + end + end + end + describe "discover" do let(:service_data) { { @@ -600,10 +637,10 @@ end describe '#start' do - it 'starts a thread' do - expect(Thread).to receive(:new) + it 'queues onto the scheduler' do + expect(mock_scheduler).to receive(:post).exactly(:once) allow(ZK).to receive(:new).and_return(mock_zk) - subject.start + subject.start(mock_scheduler) end it 'connects to zookeeper' do @@ -614,7 +651,7 @@ .with('somehost', :timeout => 5, :receive_timeout_msec => 18000, :thread => :per_callback) .and_return(mock_zk) - subject.start + subject.start(mock_scheduler) end it 'does not call create' do @@ -622,24 +659,22 @@ allow(ZK).to receive(:new).and_return(mock_zk) expect(mock_zk).not_to receive(:create) - subject.start + subject.start(mock_scheduler) end end describe '#stop' do context 'when connected to zookeeper' do before :each do - subject.instance_variable_set(:@thread, mock_thread.as_null_object) allow(mock_zk).to receive(:connecting?).and_return(false) allow(mock_zk).to receive(:connected?).and_return(true) end it 'disconnects' do allow(ZK).to receive(:new).and_return(mock_zk) - allow(Thread).to receive(:new) - expect(mock_zk).to receive(:close!).exactly(:once) - subject.start + + subject.start(mock_scheduler) subject.stop end end @@ -649,25 +684,20 @@ subject.stop end end - - context 'when thread is not running' do - before :each do - subject.instance_variable_set(:@zk, mock_zk.as_null_object) - end - - it 'continues silently' do - expect { subject.stop }.not_to raise_error - end - end end describe '#ping' do - before :each do - subject.instance_variable_set(:@zk, mock_zk) - allow(mock_zk).to receive(:connecting?).and_return(false) - allow(mock_zk).to receive(:associating?).and_return(false) - allow(mock_zk).to receive(:connected?).and_return(false) - end + let(:mock_zk) { + zk = double(ZK) + Synapse::ServiceWatcher::ZookeeperPollWatcher.class_variable_set(:@@zk_pool, {}) + Synapse::ServiceWatcher::ZookeeperPollWatcher.class_variable_get(:@@zk_should_exit).make_false + + subject.instance_variable_set(:@zk, zk) + allow(zk).to receive(:connecting?).and_return(false) + allow(zk).to receive(:associating?).and_return(false) + allow(zk).to receive(:connected?).and_return(false) + zk + } it 'checks zookeeper' do expect(mock_zk).to receive(:connecting?) @@ -851,7 +881,7 @@ expect(mock_dns).to receive(:start).exactly(:once) expect(Thread).to receive(:new).exactly(:once) - subject.start + subject.start(mock_scheduler) end end @@ -899,7 +929,7 @@ expect(mock_dns).to receive(:start).exactly(:once) expect(Thread).to receive(:new).exactly(:once) - subject.start + subject.start(mock_scheduler) end end diff --git a/synapse.gemspec b/synapse.gemspec index 94f791f4..05fb62bf 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -27,6 +27,8 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "logging", "~> 1.8" gem.add_runtime_dependency "hashdiff", "~> 0.2.3" gem.add_runtime_dependency "dogstatsd-ruby", "~> 3.3.0" + gem.add_runtime_dependency "nokogiri", "~> 1.6.8.1" + gem.add_runtime_dependency "concurrent-ruby", "~> 1.1.6" gem.add_development_dependency "rake", "~> 11" gem.add_development_dependency "rspec", "~> 3.1.0"