Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: move deltaUpdateTicker/gcstats into a new goroutine (#58926) #59163

Open
wants to merge 10 commits into
base: release-8.5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 93 additions & 55 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2365,40 +2365,31 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
do.indexUsageWorker()
}, "indexUsageWorker")
if do.statsLease <= 0 {
// For statsLease > 0, `updateStatsWorker` handles the quit of stats owner.
// For statsLease > 0, `gcStatsWorker` handles the quit of stats owner.
do.wg.Run(func() { quitStatsOwner(do, do.statsOwner) }, "quitStatsOwner")
return nil
}
waitStartTask := func(do *Domain, fn func()) {
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
fn()
}
do.SetStatsUpdating(true)
// The asyncLoadHistogram/dumpColStatsUsageWorker/deltaUpdateTickerWorker doesn't require the stats initialization to be completed.
// This is because thos workers' primary responsibilities are to update the change delta and handle DDL operations.
// These tasks need to be in work mod as soon as possible to avoid the problem.
do.wg.Run(do.asyncLoadHistogram, "asyncLoadHistogram")
// The stats updated worker doesn't require the stats initialization to be completed.
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
do.wg.Run(do.deltaUpdateTickerWorker, "deltaUpdateTickerWorker")
do.wg.Run(do.dumpColStatsUsageWorker, "dumpColStatsUsageWorker")
do.wg.Run(func() { waitStartTask(do, do.gcStatsWorker) }, "gcStatsWorker")

// Wait for the stats worker to finish the initialization.
// Otherwise, we may start the auto analyze worker before the stats cache is initialized.
do.wg.Run(
func() {
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.autoAnalyzeWorker()
},
"autoAnalyzeWorker",
)
do.wg.Run(
func() {
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
do.analyzeJobsCleanupWorker()
},
"analyzeJobsCleanupWorker",
)
do.wg.Run(func() { waitStartTask(do, do.autoAnalyzeWorker) }, "autoAnalyzeWorker")
do.wg.Run(func() { waitStartTask(do, do.analyzeJobsCleanupWorker) }, "analyzeJobsCleanupWorker")
do.wg.Run(
func() {
// The initStatsCtx is used to store the internal session for initializing stats,
Expand Down Expand Up @@ -2579,60 +2570,66 @@ func (do *Domain) indexUsageWorker() {
}
}

func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) {
func (do *Domain) gcStatsWorkerExitPreprocessing() {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats")
statsHandle.FlushStats()
logutil.BgLogger().Info("updateStatsWorker ready to release owner")
logutil.BgLogger().Info("gcStatsWorker ready to release owner")
do.statsOwner.Close()
ch <- struct{}{}
}()
select {
case <-ch:
logutil.BgLogger().Info("updateStatsWorker exit preprocessing finished")
logutil.BgLogger().Info("gcStatsWorker exit preprocessing finished")
return
case <-timeout.Done():
logutil.BgLogger().Warn("updateStatsWorker exit preprocessing timeout, force exiting")
logutil.BgLogger().Warn("gcStatsWorker exit preprocessing timeout, force exiting")
return
}
}

func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
func (*Domain) deltaUpdateTickerWorkerExitPreprocessing(statsHandle *handle.Handle) {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("deltaUpdateTicker is going to exit, start to flush stats")
statsHandle.FlushStats()
ch <- struct{}{}
}()
select {
case <-ch:
logutil.BgLogger().Info("deltaUpdateTicker exit preprocessing finished")
return
case <-timeout.Done():
logutil.BgLogger().Warn("deltaUpdateTicker exit preprocessing timeout, force exiting")
return
}
}

func (do *Domain) gcStatsWorker() {
defer util.Recover(metrics.LabelDomain, "gcStatsWorker", nil, false)
logutil.BgLogger().Info("gcStatsWorker started.")
lease := do.statsLease
// We need to have different nodes trigger tasks at different times to avoid the herd effect.
randDuration := time.Duration(rand.Int63n(int64(time.Minute)))
deltaUpdateTicker := time.NewTicker(20*lease + randDuration)
gcStatsTicker := time.NewTicker(100 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
updateStatsHealthyTicker := time.NewTicker(20 * lease)
readMemTicker := time.NewTicker(memory.ReadMemInterval)
statsHandle := do.StatsHandle()
defer func() {
dumpColStatsUsageTicker.Stop()
gcStatsTicker.Stop()
deltaUpdateTicker.Stop()
readMemTicker.Stop()
updateStatsHealthyTicker.Stop()
do.SetStatsUpdating(false)
logutil.BgLogger().Info("updateStatsWorker exited.")
logutil.BgLogger().Info("gcStatsWorker exited.")
}()
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
defer util.Recover(metrics.LabelDomain, "gcStatsWorker", nil, false)

for {
select {
case <-do.exit:
do.updateStatsWorkerExitPreprocessing(statsHandle)
do.gcStatsWorkerExitPreprocessing()
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !do.statsOwner.IsOwner() {
continue
Expand All @@ -2642,11 +2639,6 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
logutil.BgLogger().Warn("GC stats failed", zap.Error(err))
}
do.CheckAutoAnalyzeWindows()
case <-dumpColStatsUsageTicker.C:
err := statsHandle.DumpColStatsUsageToKV()
if err != nil {
logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err))
}
case <-readMemTicker.C:
memory.ForceReadMemStats()
do.StatsHandle().StatsCache.TriggerEvict()
Expand All @@ -2656,6 +2648,52 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
}
}

func (do *Domain) dumpColStatsUsageWorker() {
logutil.BgLogger().Info("dumpColStatsUsageWorker started.")
lease := do.statsLease
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
statsHandle := do.StatsHandle()
defer func() {
dumpColStatsUsageTicker.Stop()
logutil.BgLogger().Info("dumpColStatsUsageWorker exited.")
}()
defer util.Recover(metrics.LabelDomain, "dumpColStatsUsageWorker", nil, false)

for {
select {
case <-do.exit:
return
case <-dumpColStatsUsageTicker.C:
err := statsHandle.DumpColStatsUsageToKV()
if err != nil {
logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err))
}
}
}
}

func (do *Domain) deltaUpdateTickerWorker() {
defer util.Recover(metrics.LabelDomain, "deltaUpdateTickerWorker", nil, false)
logutil.BgLogger().Info("deltaUpdateTickerWorker started.")
lease := do.statsLease
// We need to have different nodes trigger tasks at different times to avoid the herd effect.
randDuration := time.Duration(rand.Int63n(int64(time.Minute)))
deltaUpdateTicker := time.NewTicker(20*lease + randDuration)
statsHandle := do.StatsHandle()
for {
select {
case <-do.exit:
do.deltaUpdateTickerWorkerExitPreprocessing(statsHandle)
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err))
}
}
}
}

func (do *Domain) autoAnalyzeWorker() {
defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false)
statsHandle := do.StatsHandle()
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestStatWorkRecoverFromPanic(t *testing.T) {
metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
// Test that they can recover from panic correctly.
dom.updateStatsWorker(mock.NewContext())
dom.gcStatsWorker()
dom.autoAnalyzeWorker()
counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain)
pb := &dto.Metric{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func TestMain(m *testing.M) {
goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"),
goleak.IgnoreTopFunction("go.etcd.io/etcd/client/pkg/v3/logutil.(*MergeLogger).outputLoop"),
goleak.IgnoreTopFunction("github.com/go-sql-driver/mysql.(*mysqlConn).startWatcher.func1"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/util/timeutil.Sleep"),
goleak.IgnoreTopFunction("github.com/pingcap/tidb/pkg/ddl/notifier.(*DDLNotifier).start"),
}

callback := func(i int) int {
Expand Down