Skip to content

Commit

Permalink
fix(promtail): Fix bug with Promtail config reloading getting stuck i… (
Browse files Browse the repository at this point in the history
#12939)

Co-authored-by: Paulin Todev <[email protected]>
  • Loading branch information
MasslessParticle and ptodev committed Nov 22, 2024
1 parent 288c006 commit efd5f99
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 16 deletions.
112 changes: 96 additions & 16 deletions clients/pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"os"
"path/filepath"
"sync"
"time"

"github.com/bmatcuk/doublestar"
Expand All @@ -24,6 +25,8 @@ const (
FilenameLabel = "filename"
)

var errFileTargetStopped = errors.New("File target is stopped")

// Config describes behavior for Target
type Config struct {
SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"`
Expand Down Expand Up @@ -92,12 +95,14 @@ type FileTarget struct {
fileEventWatcher chan fsnotify.Event
targetEventHandler chan fileTargetEvent
watches map[string]struct{}
watchesMutex sync.Mutex
path string
pathExclude string
quit chan struct{}
done chan struct{}

readers map[string]Reader
readers map[string]Reader
readersMutex sync.Mutex

targetConfig *Config
watchConfig WatchConfig
Expand Down Expand Up @@ -150,7 +155,7 @@ func NewFileTarget(

// Ready if at least one file is being tailed
func (t *FileTarget) Ready() bool {
return len(t.readers) > 0
return t.getReadersLen() > 0
}

// Stop the target.
Expand Down Expand Up @@ -178,17 +183,21 @@ func (t *FileTarget) Labels() model.LabelSet {
// Details implements a Target
func (t *FileTarget) Details() interface{} {
files := map[string]int64{}
t.readersMutex.Lock()
for fileName := range t.readers {
files[fileName], _ = t.positions.Get(fileName)
}
t.readersMutex.Unlock()
return files
}

func (t *FileTarget) run() {
defer func() {
t.readersMutex.Lock()
for _, v := range t.readers {
v.Stop()
}
t.readersMutex.Unlock()
level.Info(t.logger).Log("msg", "filetarget: watcher closed, tailer stopped, positions saved", "path", t.path)
close(t.done)
}()
Expand Down Expand Up @@ -216,6 +225,11 @@ func (t *FileTarget) run() {
}
case <-ticker.C:
err := t.sync()
if errors.Is(err, errFileTargetStopped) {
// This file target has been stopped.
// This is normal and there is no need to log an error.
return
}
if err != nil {
level.Error(t.logger).Log("msg", "error running sync function", "error", err)
}
Expand Down Expand Up @@ -281,15 +295,28 @@ func (t *FileTarget) sync() error {
}

// Add any directories which are not already being watched.
t.watchesMutex.Lock()
toStartWatching := missing(t.watches, dirs)
t.startWatching(toStartWatching)
t.watchesMutex.Unlock()
err := t.startWatching(toStartWatching)
if errors.Is(err, errFileTargetStopped) {
return err
}

// Remove any directories which no longer need watching.
t.watchesMutex.Lock()
toStopWatching := missing(dirs, t.watches)
t.stopWatching(toStopWatching)
t.watchesMutex.Unlock()

err = t.stopWatching(toStopWatching)
if errors.Is(err, errFileTargetStopped) {
return err
}

// fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves.
t.watchesMutex.Lock()
t.watches = dirs
t.watchesMutex.Unlock()

// Check if any running tailers have stopped because of errors and remove them from the running list
// (They will be restarted in startTailing)
Expand All @@ -299,41 +326,55 @@ func (t *FileTarget) sync() error {
t.startTailing(matches)

// Stop tailing any files which no longer exist
t.readersMutex.Lock()
toStopTailing := toStopTailing(matches, t.readers)
t.readersMutex.Unlock()
t.stopTailingAndRemovePosition(toStopTailing)

return nil
}

func (t *FileTarget) startWatching(dirs map[string]struct{}) {
func (t *FileTarget) startWatching(dirs map[string]struct{}) error {
for dir := range dirs {
if _, ok := t.watches[dir]; ok {
if _, ok := t.getWatch(dir); ok {
continue
}

level.Info(t.logger).Log("msg", "watching new directory", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
select {
case <-t.quit:
return errFileTargetStopped
case t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStart,
}:
}
}
return nil
}

func (t *FileTarget) stopWatching(dirs map[string]struct{}) {
func (t *FileTarget) stopWatching(dirs map[string]struct{}) error {
for dir := range dirs {
if _, ok := t.watches[dir]; !ok {
if _, ok := t.getWatch(dir); !ok {
continue
}

level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
select {
case <-t.quit:
return errFileTargetStopped
case t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStop,
}:
}
}
return nil
}

func (t *FileTarget) startTailing(ps []string) {
for _, p := range ps {
if _, ok := t.readers[p]; ok {
if _, ok := t.getReader(p); ok {
continue
}

Expand Down Expand Up @@ -387,25 +428,26 @@ func (t *FileTarget) startTailing(ps []string) {
}
reader = tailer
}
t.readers[p] = reader
t.setReader(p, reader)
}
}

// stopTailingAndRemovePosition will stop the tailer and remove the positions entry.
// Call this when a file no longer exists and you want to remove all traces of it.
func (t *FileTarget) stopTailingAndRemovePosition(ps []string) {
for _, p := range ps {
if reader, ok := t.readers[p]; ok {
if reader, ok := t.getReader(p); ok {
reader.Stop()
t.positions.Remove(reader.Path())
delete(t.readers, p)
t.removeReader(p)
}
}
}

// pruneStoppedTailers removes any tailers which have stopped running from
// the list of active tailers. This allows them to be restarted if there were errors.
func (t *FileTarget) pruneStoppedTailers() {
t.readersMutex.Lock()
toRemove := make([]string, 0, len(t.readers))
for k, t := range t.readers {
if !t.IsRunning() {
Expand All @@ -415,6 +457,45 @@ func (t *FileTarget) pruneStoppedTailers() {
for _, tr := range toRemove {
delete(t.readers, tr)
}
t.readersMutex.Unlock()
}

func (t *FileTarget) getReadersLen() int {
t.readersMutex.Lock()
defer t.readersMutex.Unlock()
return len(t.readers)
}

func (t *FileTarget) getReader(val string) (Reader, bool) {
t.readersMutex.Lock()
defer t.readersMutex.Unlock()
reader, ok := t.readers[val]
return reader, ok
}

func (t *FileTarget) setReader(val string, reader Reader) {
t.readersMutex.Lock()
defer t.readersMutex.Unlock()
t.readers[val] = reader
}

func (t *FileTarget) getWatch(val string) (struct{}, bool) {
t.watchesMutex.Lock()
defer t.watchesMutex.Unlock()
fileTarget, ok := t.watches[val]
return fileTarget, ok
}

func (t *FileTarget) removeReader(val string) {
t.readersMutex.Lock()
defer t.readersMutex.Unlock()
delete(t.readers, val)
}

func (t *FileTarget) getWatchesLen() int {
t.watchesMutex.Lock()
defer t.watchesMutex.Unlock()
return len(t.watches)
}

func toStopTailing(nt []string, et map[string]Reader) []string {
Expand Down Expand Up @@ -442,7 +523,7 @@ func toStopTailing(nt []string, et map[string]Reader) []string {
func (t *FileTarget) reportSize(ms []string) {
for _, m := range ms {
// Ask the tailer to update the size if a tailer exists, this keeps position and size metrics in sync
if reader, ok := t.readers[m]; ok {
if reader, ok := t.getReader(m); ok {
err := reader.MarkPositionAndSize()
if err != nil {
level.Warn(t.logger).Log("msg", "failed to get file size from tailer, ", "file", m, "error", err)
Expand All @@ -459,7 +540,6 @@ func (t *FileTarget) reportSize(ms []string) {
}
t.metrics.totalBytes.WithLabelValues(m).Set(float64(fi.Size()))
}

}
}

Expand Down
87 changes: 87 additions & 0 deletions clients/pkg/promtail/targets/file/filetarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,93 @@ func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
ps.Stop()
}

// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send.
func TestFileTarget_StopAbruptly(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir1 := filepath.Join(dirName, "log1")
logDir2 := filepath.Join(dirName, "log2")
logDir3 := filepath.Join(dirName, "log3")

logfile1 := filepath.Join(logDir1, "test1.log")
logfile2 := filepath.Join(logDir2, "test1.log")
logfile3 := filepath.Join(logDir3, "test1.log")

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Millisecond,
PositionsFile: positionsFileName,
})
require.NoError(t, err)

client := fake.New(func() {})
defer client.Stop()

// fakeHandler has to be a buffered channel so that we can call the len() function on it.
// We need to call len() to check if the channel is full.
fakeHandler := make(chan fileTargetEvent, 1)
pathToWatch := filepath.Join(dirName, "**", "*.log")
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Create a directory, still nothing is watched.
err = os.MkdirAll(logDir1, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile1)
assert.NoError(t, err)

// There should be only one WatchStart event in the channel so far.
ftEvent := <-fakeHandler
require.Equal(t, fileTargetEventWatchStart, ftEvent.eventType)

requireEventually(t, func() bool {
return target.getReadersLen() == 1
}, "expected 1 tailer to be created")

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 1
`), "promtail_files_active_total"))

// Create two directories - one more than the buffer of fakeHandler,
// so that the file target hands until we call Stop().
err = os.MkdirAll(logDir2, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile2)
assert.NoError(t, err)

err = os.MkdirAll(logDir3, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile3)
assert.NoError(t, err)

// Wait until the file target is waiting on a channel send due to a full channel buffer.
requireEventually(t, func() bool {
return len(fakeHandler) == 1
}, "expected an event in the fakeHandler channel")

// If FileHandler works well, then it will stop waiting for
// the blocked fakeHandler and stop cleanly.
// This is why this time we don't drain fakeHandler.
requireEventually(t, func() bool {
target.Stop()
ps.Stop()
return true
}, "expected FileTarget not to hang")

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 0
`), "promtail_files_active_total"))
}

func TestFileTargetPathExclusion(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down

0 comments on commit efd5f99

Please sign in to comment.