diff --git a/tailer/bufferedTailer_test.go b/tailer/bufferedTailer_test.go index 38fb27af..2963a5b6 100644 --- a/tailer/bufferedTailer_test.go +++ b/tailer/bufferedTailer_test.go @@ -64,10 +64,11 @@ func TestLineBufferSequential(t *testing.T) { if stillOpen { t.Error("Source tailer was not closed.") } - if !metric.registerCalled { + registerCalled, unregisterCalled := metric.Get() + if !registerCalled { t.Error("metric.Register() not called.") } - if !metric.unregisterCalled { + if !unregisterCalled { t.Error("metric.Unregister() not called.") } // The peak load should be 9999 or 9998, depending on how quick @@ -119,10 +120,11 @@ func TestLineBufferParallel(t *testing.T) { if stillOpen { t.Error("Source tailer was not closed.") } - if !metric.registerCalled { + registerCalled, unregisterCalled := metric.Get() + if !registerCalled { t.Error("metric.Register() not called.") } - if !metric.unregisterCalled { + if !unregisterCalled { t.Error("metric.Unregister() not called.") } // Should be much less than 10000, because consumer and producer work in parallel. @@ -130,20 +132,33 @@ func TestLineBufferParallel(t *testing.T) { } type peakLoadMetric struct { + lock sync.Mutex registerCalled, unregisterCalled bool peakLoad float64 } func (m *peakLoadMetric) Register() { + m.lock.Lock() m.registerCalled = true + m.lock.Unlock() } func (m *peakLoadMetric) Observe(currentLoad float64) { + m.lock.Lock() if currentLoad > m.peakLoad { m.peakLoad = currentLoad } + m.lock.Unlock() } func (m *peakLoadMetric) Unregister() { + m.lock.Lock() m.unregisterCalled = true + m.lock.Unlock() +} + +func (m *peakLoadMetric) Get() (bool, bool) { + m.lock.Lock() + defer m.lock.Unlock() + return m.registerCalled, m.unregisterCalled } diff --git a/tailer/fswatcher_test.go b/tailer/fswatcher_test.go index 0f3acda7..9f53330f 100644 --- a/tailer/fswatcher_test.go +++ b/tailer/fswatcher_test.go @@ -29,6 +29,7 @@ import ( "runtime/pprof" "strconv" "strings" + "sync" "testing" "time" ) @@ -233,6 +234,7 @@ type context struct { log logrus.FieldLogger tailer fswatcher.FileTailer lines map[string]chan string + linesLock sync.Mutex } func exec(t *testing.T, ctx *context, cmd []string) { @@ -503,12 +505,14 @@ func startFileTailer(t *testing.T, ctx *context, params []string) { if !open { return // tailer closed } + ctx.linesLock.Lock() c, ok := ctx.lines[line.File] if !ok { c = make(chan string) ctx.log.Debugf("adding lines channel for %v", line.File) ctx.lines[line.File] = c } + ctx.linesLock.Unlock() c <- line.Line case err, open := <-tailer.Errors(): if !open { @@ -525,10 +529,12 @@ func startFileTailer(t *testing.T, ctx *context, params []string) { } func expect(t *testing.T, ctx *context, line string, file string) { - var ( - timeout = 5 * time.Second - c = ctx.lines[filepath.Join(ctx.basedir, file)] - ) + var timeout = 5 * time.Second + + ctx.linesLock.Lock() + c := ctx.lines[filepath.Join(ctx.basedir, file)] + ctx.linesLock.Unlock() + for c == nil { time.Sleep(100 * time.Millisecond) timeout = timeout - 10*time.Millisecond @@ -537,7 +543,9 @@ func expect(t *testing.T, ctx *context, line string, file string) { return } ctx.log.Debugf("waiting for lines channel for %v", filepath.Join(ctx.basedir, file)) + ctx.linesLock.Lock() c = ctx.lines[filepath.Join(ctx.basedir, file)] + ctx.linesLock.Unlock() } select { case l := <-c: