diff --git a/consul/watcher.go b/consul/watcher.go index 6211007..78de360 100644 --- a/consul/watcher.go +++ b/consul/watcher.go @@ -63,8 +63,9 @@ type Watcher struct { certCAPool *x509.CertPool leaf *certLeaf - update chan struct{} - log Logger + update chan struct{} + shutdownCh chan struct{} + log Logger } // New builds a new watcher @@ -73,10 +74,11 @@ func New(service string, consul *api.Client, log Logger) *Watcher { service: service, consul: consul, - C: make(chan Config), - upstreams: make(map[string]*upstream), - update: make(chan struct{}, 1), - log: log, + C: make(chan Config), + upstreams: make(map[string]*upstream), + update: make(chan struct{}, 1), + shutdownCh: make(chan struct{}), + log: log, } } @@ -189,9 +191,11 @@ func (w *Watcher) startUpstreamService(up api.Upstream, name string) { go func() { index := uint64(0) for { + w.lock.Lock() if u.done { return } + w.lock.Unlock() nodes, meta, err := w.consul.Health().Connect(up.DestinationName, "", true, &api.QueryOptions{ Datacenter: up.Datacenter, WaitTime: 10 * time.Minute, @@ -331,6 +335,9 @@ func (w *Watcher) watchLeaf() { w.ready.Done() first = false } + if w.isStopped() { + return + } } } @@ -361,6 +368,9 @@ func (w *Watcher) watchService(service string, handler func(first bool, srv *api } first = false + if w.isStopped() { + return + } } } @@ -405,6 +415,9 @@ func (w *Watcher) watchCA() { w.ready.Done() first = false } + if w.isStopped() { + return + } } } @@ -492,3 +505,16 @@ func (w *Watcher) notifyChanged() { default: } } + +func (w *Watcher) Stop() { + close(w.shutdownCh) +} + +func (w *Watcher) isStopped() bool { + select { + case <-w.shutdownCh: + return true + default: + return false + } +} diff --git a/utils_test.go b/utils_test.go index 0b7803f..5c2dc63 100644 --- a/utils_test.go +++ b/utils_test.go @@ -70,6 +70,7 @@ func startConnectService(t *testing.T, sd *lib.Shutdown, client *api.Client, reg errs <- err } }() + watcher.Stop() sourceHap := haproxy.New(client, watcher.C, haproxy.Options{ EnableIntentions: true,