diff --git a/scope.go b/scope.go index 1caf3b59..80983bcf 100644 --- a/scope.go +++ b/scope.go @@ -66,7 +66,7 @@ type scope struct { defaultBuckets Buckets registry *scopeRegistry - quit chan struct{} + status scopeStatus cm sync.RWMutex gm sync.RWMutex @@ -79,6 +79,12 @@ type scope struct { histograms map[string]*histogram } +type scopeStatus struct { + sync.RWMutex + closed bool + quit chan struct{} +} + type scopeRegistry struct { sync.RWMutex subscopes map[string]*scope @@ -147,7 +153,10 @@ func newRootScope(opts ScopeOptions, interval time.Duration) *scope { registry: &scopeRegistry{ subscopes: make(map[string]*scope), }, - quit: make(chan struct{}, 1), + status: scopeStatus{ + closed: false, + quit: make(chan struct{}, 1), + }, counters: make(map[string]*counter), gauges: make(map[string]*gauge), @@ -217,27 +226,42 @@ func (s *scope) cachedReport(c CachedStatsReporter) { // reportLoop is used by the root scope for periodic reporting func (s *scope) reportLoop(interval time.Duration) { ticker := time.NewTicker(interval) + defer ticker.Stop() + for { select { case <-ticker.C: - s.registry.RLock() - if s.reporter != nil { - for _, ss := range s.registry.subscopes { - ss.report(s.reporter) - } - } else if s.cachedReporter != nil { - for _, ss := range s.registry.subscopes { - ss.cachedReport(s.cachedReporter) - } - } - - s.registry.RUnlock() - case <-s.quit: + s.reportLoopRun() + case <-s.status.quit: return } } } +func (s *scope) reportLoopRun() { + // Need to hold a status lock to ensure not to report + // and flush after a close + s.status.RLock() + if s.status.closed { + s.status.RUnlock() + return + } + + s.registry.RLock() + if s.reporter != nil { + for _, ss := range s.registry.subscopes { + ss.report(s.reporter) + } + } else if s.cachedReporter != nil { + for _, ss := range s.registry.subscopes { + ss.cachedReport(s.cachedReporter) + } + } + s.registry.RUnlock() + + s.status.RUnlock() +} + func (s *scope) Counter(name string) Counter { s.cm.RLock() val, ok := s.counters[name] @@ -440,7 +464,11 @@ func (s *scope) Snapshot() Snapshot { } func (s *scope) Close() error { - close(s.quit) + s.status.Lock() + s.status.closed = true + close(s.status.quit) + s.status.Unlock() + if closer, ok := s.baseReporter.(io.Closer); ok { return closer.Close() } diff --git a/scope_test.go b/scope_test.go index 2199ad19..20c5d731 100644 --- a/scope_test.go +++ b/scope_test.go @@ -22,6 +22,7 @@ package tally import ( "sync" + "sync/atomic" "testing" "time" @@ -80,6 +81,8 @@ type testStatsReporter struct { gauges map[string]*testFloatValue timers map[string]*testIntValue histograms map[string]*testHistogramValue + + flushes int32 } // newTestStatsReporter returns a new TestStatsReporter @@ -248,11 +251,14 @@ func (r *testStatsReporter) Capabilities() Capabilities { return capabilitiesReportingNoTagging } -func (r *testStatsReporter) Flush() {} +func (r *testStatsReporter) Flush() { + atomic.AddInt32(&r.flushes, 1) +} func TestWriteTimerImmediately(t *testing.T) { r := newTestStatsReporter() - s, _ := NewRootScope(ScopeOptions{Reporter: r}, 0) + s, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + defer closer.Close() r.tg.Add(1) s.Timer("ticky").Record(time.Millisecond * 175) r.tg.Wait() @@ -260,7 +266,8 @@ func TestWriteTimerImmediately(t *testing.T) { func TestWriteTimerClosureImmediately(t *testing.T) { r := newTestStatsReporter() - s, _ := NewRootScope(ScopeOptions{Reporter: r}, 0) + s, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + defer closer.Close() r.tg.Add(1) tm := s.Timer("ticky") tm.Start().Stop() @@ -269,8 +276,8 @@ func TestWriteTimerClosureImmediately(t *testing.T) { func TestWriteReportLoop(t *testing.T) { r := newTestStatsReporter() - s, close := NewRootScope(ScopeOptions{Reporter: r}, 10) - defer close.Close() + s, closer := NewRootScope(ScopeOptions{Reporter: r}, 10) + defer closer.Close() r.cg.Add(1) s.Counter("bar").Inc(1) @@ -287,8 +294,8 @@ func TestWriteReportLoop(t *testing.T) { func TestCachedReportLoop(t *testing.T) { r := newTestStatsReporter() - s, close := NewRootScope(ScopeOptions{CachedReporter: r}, 10) - defer close.Close() + s, closer := NewRootScope(ScopeOptions{CachedReporter: r}, 10) + defer closer.Close() r.cg.Add(1) s.Counter("bar").Inc(1) @@ -306,7 +313,9 @@ func TestCachedReportLoop(t *testing.T) { func TestWriteOnce(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + defer closer.Close() + s := root.(*scope) r.cg.Add(1) @@ -338,7 +347,9 @@ func TestWriteOnce(t *testing.T) { func TestCachedReporter(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{CachedReporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{CachedReporter: r}, 0) + defer closer.Close() + s := root.(*scope) r.cg.Add(1) @@ -366,7 +377,9 @@ func TestCachedReporter(t *testing.T) { func TestRootScopeWithoutPrefix(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + defer closer.Close() + s := root.(*scope) r.cg.Add(1) s.Counter("bar").Inc(1) @@ -391,7 +404,9 @@ func TestRootScopeWithoutPrefix(t *testing.T) { func TestRootScopeWithPrefix(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0) + defer closer.Close() + s := root.(*scope) r.cg.Add(1) s.Counter("bar").Inc(1) @@ -416,7 +431,9 @@ func TestRootScopeWithPrefix(t *testing.T) { func TestRootScopeWithDifferentSeparator(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Separator: "_", Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Separator: "_", Reporter: r}, 0) + defer closer.Close() + s := root.(*scope) r.cg.Add(1) s.Counter("bar").Inc(1) @@ -441,7 +458,9 @@ func TestRootScopeWithDifferentSeparator(t *testing.T) { func TestSubScope(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Reporter: r}, 0) + defer closer.Close() + s := root.SubScope("mork").(*scope) r.cg.Add(1) s.Counter("bar").Inc(1) @@ -467,7 +486,9 @@ func TestTaggedSubScope(t *testing.T) { r := newTestStatsReporter() ts := map[string]string{"env": "test"} - root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Tags: ts, Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Tags: ts, Reporter: r}, 0) + defer closer.Close() + s := root.(*scope) tscope := root.Tagged(map[string]string{"service": "test"}).(*scope) @@ -514,7 +535,8 @@ func TestTaggedExistingReturnsSameScope(t *testing.T) { nil, {"env": "test"}, } { - root, _ := NewRootScope(ScopeOptions{Prefix: "foo", Tags: initialTags, Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Prefix: "foo", Tags: initialTags, Reporter: r}, 0) + defer closer.Close() rootScope := root.(*scope) fooScope := root.Tagged(map[string]string{"foo": "bar"}).(*scope) @@ -565,13 +587,15 @@ func TestSnapshot(t *testing.T) { func TestCapabilities(t *testing.T) { r := newTestStatsReporter() - s, _ := NewRootScope(ScopeOptions{Reporter: r}, 0) + s, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + defer closer.Close() assert.True(t, s.Capabilities().Reporting()) assert.False(t, s.Capabilities().Tagging()) } func TestCapabilitiesNoReporter(t *testing.T) { - s, _ := NewRootScope(ScopeOptions{}, 0) + s, closer := NewRootScope(ScopeOptions{}, 0) + defer closer.Close() assert.False(t, s.Capabilities().Reporting()) assert.False(t, s.Capabilities().Tagging()) } @@ -583,7 +607,7 @@ func TestNilTagMerge(t *testing.T) { func TestScopeDefaultBuckets(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{ + root, closer := NewRootScope(ScopeOptions{ DefaultBuckets: DurationBuckets{ 0 * time.Millisecond, 30 * time.Millisecond, @@ -593,6 +617,8 @@ func TestScopeDefaultBuckets(t *testing.T) { }, Reporter: r, }, 0) + defer closer.Close() + s := root.(*scope) r.hg.Add(2) s.Histogram("baz", DefaultBuckets).RecordDuration(42 * time.Millisecond) @@ -619,7 +645,9 @@ func newTestMets(scope Scope) testMets { func TestReturnByValue(t *testing.T) { r := newTestStatsReporter() - root, _ := NewRootScope(ScopeOptions{Reporter: r}, 0) + root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + defer closer.Close() + s := root.(*scope) mets := newTestMets(s) @@ -630,3 +658,18 @@ func TestReturnByValue(t *testing.T) { assert.EqualValues(t, 3, r.counters["honk"].val) } + +func TestScopeAvoidReportLoopRunOnClose(t *testing.T) { + r := newTestStatsReporter() + root, closer := NewRootScope(ScopeOptions{Reporter: r}, 0) + + s := root.(*scope) + s.reportLoopRun() + + assert.Equal(t, int32(1), atomic.LoadInt32(&r.flushes)) + + assert.NoError(t, closer.Close()) + + s.reportLoopRun() + assert.Equal(t, int32(1), atomic.LoadInt32(&r.flushes)) +}