From 5b63f131fe30e23292f5b2a784ae4dad1d84de45 Mon Sep 17 00:00:00 2001 From: George Tsagkarelis Date: Thu, 16 Jan 2025 19:40:49 +0100 Subject: [PATCH] tapdb: always store result of querySyncStats Our universe stats could run into a case where if in an un-cached state the db query takes longer than the default client timeout (usually 30s) then we'd fail the query and return early. This would cause the next client to also receive an error instead of the cached result, making this codepath stuck in an endless failing query. With this change we parallelize the db query and we always wait for the result in order to store it in cache. We may fail the function earlier, but the result will always be retrieved & persisted. --- tapdb/universe_stats.go | 49 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/tapdb/universe_stats.go b/tapdb/universe_stats.go index fe110fa55..c07fb2413 100644 --- a/tapdb/universe_stats.go +++ b/tapdb/universe_stats.go @@ -575,14 +575,48 @@ func (u *UniverseStats) AggregateSyncStats( log.Debugf("Populating aggregate sync stats") - dbStats, err := u.querySyncStats(ctx) - if err != nil { + var ( + resChan = make(chan universe.AggregateStats, 1) + errChan = make(chan error, 1) + dbStats universe.AggregateStats + ) + + // We'll fire the db query in a separate go routine, with an undefined + // timeout. This way, we'll always retrieve the result regardless of + // what happens in the current context. This way we're always waiting + // for the call to complete and caching the result even if this + // function's result is an error. + go func() { + // Note: we have the statsMtx held, so even if a burst of + // requests took place in an un-cached state, this call would + // not be triggered multiple times. + dbStats, err := u.querySyncStats(context.Background()) + if err != nil { + errChan <- err + return + } + + log.Debugf("Retrieved aggregate sync stats: %+v", dbStats) + + // We'll store the DB stats so that it can be read from cache + // later. + u.statsSnapshot.Store(&dbStats) + + resChan <- dbStats + }() + + select { + case <-ctx.Done(): + log.Debugf("ctx timeout before retrieving aggregate sync stats") + return dbStats, ctx.Err() + + case err := <-errChan: + log.Debugf("error while querying aggregate sync stats: %v", err) return dbStats, err - } - // We'll store the DB stats then start our time after function to wipe - // the stats pointer so we'll refresh it after a period of time. - u.statsSnapshot.Store(&dbStats) + case res := <-resChan: + dbStats = res + } // Reset the timer so we'll refresh again after the cache duration. if u.statsRefresh != nil && !u.statsRefresh.Stop() { @@ -591,6 +625,9 @@ func (u *UniverseStats) AggregateSyncStats( default: } } + + log.Debugf("Refreshing sync stats cache in %v", u.opts.cacheDuration) + u.statsRefresh = time.AfterFunc( u.opts.cacheDuration, u.populateSyncStatsCache, )