From fe50c72da04ad6f00106d5b6d08f07e5468c490d Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Tue, 17 Dec 2024 18:32:57 +0530 Subject: [PATCH 1/4] chore(block-scheduler): planner improvements to limit jobs and consider retention when planning (#15432) --- docs/sources/shared/configuration.md | 4 ++ pkg/blockbuilder/scheduler/queue.go | 45 +++++++++++++++++--- pkg/blockbuilder/scheduler/queue_test.go | 15 ++++--- pkg/blockbuilder/scheduler/scheduler.go | 19 ++++++--- pkg/blockbuilder/scheduler/scheduler_test.go | 2 +- pkg/blockbuilder/scheduler/strategy.go | 21 ++++++--- pkg/blockbuilder/scheduler/strategy_test.go | 3 +- pkg/loki/modules.go | 2 +- 8 files changed, 81 insertions(+), 30 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9cf3b2a18ed0f..b64879703c960 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -230,6 +230,10 @@ block_scheduler: # CLI flag: -block-scheduler.target-record-count [target_record_count: | default = 1000] + # Maximum number of jobs that the planner can return. + # CLI flag: -block-scheduler.max-jobs-planned-per-interval + [max_jobs_planned_per_interval: | default = 100] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 1aeb15e8395e5..9b4dd4292c109 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -7,6 +7,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) @@ -35,6 +37,29 @@ func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata { } } +type jobQueueMetrics struct { + pending prometheus.Gauge + inProgress prometheus.Gauge + completed *prometheus.CounterVec +} + +func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics { + return &jobQueueMetrics{ + pending: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_pending_jobs", + Help: "Number of jobs in the block scheduler queue", + }), + inProgress: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_in_progress_jobs", + Help: "Number of jobs currently being processed", + }), + completed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_block_scheduler_completed_jobs_total", + Help: "Total number of jobs completed by the block scheduler", + }, []string{"status"}), + } +} + // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { logger log.Logger @@ -42,10 +67,12 @@ type JobQueue struct { inProgress map[string]*JobWithMetadata // Jobs currently being processed completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs statusMap map[string]types.JobStatus // Maps job ID to its current status + metrics *jobQueueMetrics mu sync.RWMutex } -func NewJobQueueWithLogger(logger log.Logger) *JobQueue { +// NewJobQueue creates a new job queue instance +func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue { return &JobQueue{ logger: logger, pending: NewPriorityQueue( @@ -57,14 +84,10 @@ func NewJobQueueWithLogger(logger log.Logger) *JobQueue { inProgress: make(map[string]*JobWithMetadata), completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity), statusMap: make(map[string]types.JobStatus), + metrics: newJobQueueMetrics(reg), } } -// NewJobQueue creates a new job queue instance -func NewJobQueue() *JobQueue { - return NewJobQueueWithLogger(log.NewNopLogger()) -} - // Exists checks if a job exists in any state and returns its status func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() @@ -111,6 +134,7 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error { jobMeta := NewJobWithMetadata(job, priority) q.pending.Push(jobMeta) q.statusMap[job.ID()] = types.JobStatusPending + q.metrics.pending.Inc() return nil } @@ -123,6 +147,7 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) { if !ok { return nil, false } + q.metrics.pending.Dec() // Update metadata for in-progress state jobMeta.Status = types.JobStatusInProgress @@ -131,6 +156,7 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) { q.inProgress[jobMeta.ID()] = jobMeta q.statusMap[jobMeta.ID()] = types.JobStatusInProgress + q.metrics.inProgress.Inc() return jobMeta.Job, true } @@ -152,6 +178,7 @@ func (q *JobQueue) RemoveInProgress(id string) { defer q.mu.Unlock() delete(q.inProgress, id) + q.metrics.inProgress.Dec() } // MarkComplete moves a job from in-progress to completed with the given status @@ -169,11 +196,13 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { case types.JobStatusInProgress: // update & remove from in progress delete(q.inProgress, id) + q.metrics.inProgress.Dec() case types.JobStatusPending: _, ok := q.pending.Remove(id) if !ok { level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id) } + q.metrics.pending.Dec() default: level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status) } @@ -187,6 +216,7 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { delete(q.statusMap, removal.ID()) } q.statusMap[id] = status + q.metrics.completed.WithLabelValues(status.String()).Inc() } // SyncJob registers a job as in-progress or updates its UpdateTime if already in progress @@ -204,6 +234,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { jobMeta.Status = types.JobStatusInProgress q.inProgress[jobID] = jobMeta q.statusMap[jobID] = types.JobStatusInProgress + q.metrics.inProgress.Inc() } jobMeta, ok := q.existsLockLess(jobID) @@ -221,6 +252,8 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID) } jobMeta.Status = types.JobStatusInProgress + q.metrics.pending.Dec() + q.metrics.inProgress.Inc() case types.JobStatusInProgress: case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: // Job already completed, re-enqueue a new one diff --git a/pkg/blockbuilder/scheduler/queue_test.go b/pkg/blockbuilder/scheduler/queue_test.go index dfbe07681c62a..cf86ec29f8941 100644 --- a/pkg/blockbuilder/scheduler/queue_test.go +++ b/pkg/blockbuilder/scheduler/queue_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/blockbuilder/types" @@ -11,7 +12,7 @@ import ( func TestJobQueue_SyncJob(t *testing.T) { t.Run("non-existent to in-progress", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) jobID := job.ID() @@ -28,7 +29,7 @@ func TestJobQueue_SyncJob(t *testing.T) { }) t.Run("pending to in-progress", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with pending job @@ -51,7 +52,7 @@ func TestJobQueue_SyncJob(t *testing.T) { }) t.Run("already in-progress", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // First sync to put in in-progress @@ -72,7 +73,7 @@ func TestJobQueue_SyncJob(t *testing.T) { func TestJobQueue_MarkComplete(t *testing.T) { t.Run("in-progress to complete", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with in-progress job @@ -102,7 +103,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("pending to complete", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with pending job @@ -129,7 +130,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("non-existent job", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) logger := &testLogger{t: t} q.logger = logger @@ -138,7 +139,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("already completed job", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) logger := &testLogger{t: t} q.logger = logger diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 55aa95459a1d9..8055fef5ddc95 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -24,11 +24,12 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` - Interval time.Duration `yaml:"interval"` - LookbackPeriod time.Duration `yaml:"lookback_period"` - Strategy string `yaml:"strategy"` - TargetRecordCount int64 `yaml:"target_record_count"` + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + LookbackPeriod time.Duration `yaml:"lookback_period"` + Strategy string `yaml:"strategy"` + TargetRecordCount int64 `yaml:"target_record_count"` + MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"` } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -53,6 +54,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { RecordCountStrategy, ), ) + f.IntVar( + &cfg.MaxJobsPlannedPerInterval, + prefix+"max-jobs-planned-per-interval", + 100, + "Maximum number of jobs that the planner can return.", + ) } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -143,7 +150,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { s.publishLagMetrics(lag) - jobs, err := s.planner.Plan(ctx) + jobs, err := s.planner.Plan(ctx, s.cfg.MaxJobsPlannedPerInterval) if err != nil { level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) } diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 48460f5e39856..1f5654312c144 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error { } func newTestEnv(builderID string) (*testEnv, error) { - queue := NewJobQueue() + queue := NewJobQueue(log.NewNopLogger(), nil) mockOffsetMgr := &mockOffsetManager{ topic: "test-topic", consumerGroup: "test-group", diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index e710d14767275..78468bbea97ae 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -19,7 +19,7 @@ type OffsetReader interface { type Planner interface { Name() string - Plan(ctx context.Context) ([]*JobWithMetadata, error) + Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) } const ( @@ -51,7 +51,7 @@ func (p *RecordCountPlanner) Name() string { return RecordCountStrategy } -func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) { +func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) { offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) @@ -60,9 +60,10 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro jobs := make([]*JobWithMetadata, 0, len(offsets)) for _, partitionOffset := range offsets { - // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. - // no additional validation is needed here - startOffset := partitionOffset.Commit.At + 1 + // 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. + // no additional validation is needed here + // 2. committed offset could be behind start offset if we are falling behind retention period. + startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset) endOffset := partitionOffset.End.Offset // Skip if there's no lag @@ -70,10 +71,15 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro continue } + var jobCount int + currentStart := startOffset // Create jobs of size targetRecordCount until we reach endOffset - for currentStart := startOffset; currentStart < endOffset; { - currentEnd := min(currentStart+p.targetRecordCount, endOffset) + for currentStart < endOffset { + if maxJobsPerPartition > 0 && jobCount >= maxJobsPerPartition { + break + } + currentEnd := min(currentStart+p.targetRecordCount, endOffset) job := NewJobWithMetadata( types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, @@ -84,6 +90,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro jobs = append(jobs, job) currentStart = currentEnd + jobCount++ } } diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index 7f6dedcb4abe6..bba62df867c8e 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -147,8 +147,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) { } require.NoError(t, cfg.Validate()) planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger()) - - jobs, err := planner.Plan(context.Background()) + jobs, err := planner.Plan(context.Background(), 0) require.NoError(t, err) require.Equal(t, len(tc.expectedJobs), len(jobs)) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 49a26498b8e31..c92f56dfcff50 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1879,7 +1879,7 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { s, err := blockscheduler.NewScheduler( t.Cfg.BlockScheduler, - blockscheduler.NewJobQueueWithLogger(logger), + blockscheduler.NewJobQueue(logger, prometheus.DefaultRegisterer), offsetManager, logger, prometheus.DefaultRegisterer, From ce017048fd7b6eedf17770420615e7e606cd5c52 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 09:22:11 -0500 Subject: [PATCH 2/4] chore(deps): update terraform google to v6.14.0 (#15443) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- tools/gcplog/main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/gcplog/main.tf b/tools/gcplog/main.tf index 6be32cbb4c493..ddcb5f3ed68a3 100644 --- a/tools/gcplog/main.tf +++ b/tools/gcplog/main.tf @@ -2,7 +2,7 @@ terraform { required_providers { google = { source = "hashicorp/google" - version = "6.13.0" + version = "6.14.0" } } } From 0d005987d7b50f09170afe6f650cec9db305e000 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 09:22:50 -0500 Subject: [PATCH 3/4] chore(deps): update terraform aws to ~> 5.81.0 (#15442) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- production/terraform/modules/s3/versions.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/production/terraform/modules/s3/versions.tf b/production/terraform/modules/s3/versions.tf index 0e8fff0ed099a..a2bda1ccd560d 100644 --- a/production/terraform/modules/s3/versions.tf +++ b/production/terraform/modules/s3/versions.tf @@ -2,7 +2,7 @@ terraform { required_providers { aws = { source = "hashicorp/aws" - version = "~> 5.80.0" + version = "~> 5.81.0" } random = { From ada86e2cd1c04847dcb68413c05c93fdd1124c37 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 09:23:17 -0500 Subject: [PATCH 4/4] fix(deps): update module github.com/baidubce/bce-sdk-go to v0.9.208 (#15441) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- vendor/github.com/baidubce/bce-sdk-go/bce/config.go | 2 +- vendor/modules.txt | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index ed49760f3c000..6b13f8dd486e3 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/alicebob/miniredis/v2 v2.33.0 github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible github.com/aws/aws-sdk-go v1.55.5 - github.com/baidubce/bce-sdk-go v0.9.206 + github.com/baidubce/bce-sdk-go v0.9.208 github.com/bmatcuk/doublestar/v4 v4.7.1 github.com/c2h5oh/datasize v0.0.0-20231215233829-aa82cc1e6500 github.com/cespare/xxhash/v2 v2.3.0 diff --git a/go.sum b/go.sum index df31d6a0b9abf..acbb6382b65c1 100644 --- a/go.sum +++ b/go.sum @@ -1008,8 +1008,8 @@ github.com/aws/smithy-go v1.11.1 h1:IQ+lPZVkSM3FRtyaDox41R8YS6iwPMYIreejOgPW49g= github.com/aws/smithy-go v1.11.1/go.mod h1:3xHYmszWVx2c0kIwQeEVf9uSm4fYZt67FBJnwub1bgM= github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo= github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM= -github.com/baidubce/bce-sdk-go v0.9.206 h1:1nmKLHWCkPzpmVATiC15+4q/lYkx4PdXd2qKfYUzTes= -github.com/baidubce/bce-sdk-go v0.9.206/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= +github.com/baidubce/bce-sdk-go v0.9.208 h1:tbtfU0Oawmd422UpUucv5HLNXmHxw9BcLFFbTtkXcDI= +github.com/baidubce/bce-sdk-go v0.9.208/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM= diff --git a/vendor/github.com/baidubce/bce-sdk-go/bce/config.go b/vendor/github.com/baidubce/bce-sdk-go/bce/config.go index 90cbb873f3992..a1c2f3a375f19 100644 --- a/vendor/github.com/baidubce/bce-sdk-go/bce/config.go +++ b/vendor/github.com/baidubce/bce-sdk-go/bce/config.go @@ -26,7 +26,7 @@ import ( // Constants and default values for the package bce const ( - SDK_VERSION = "0.9.206" + SDK_VERSION = "0.9.208" URI_PREFIX = "/" // now support uri without prefix "v1" so just set root path DEFAULT_DOMAIN = "baidubce.com" DEFAULT_PROTOCOL = "http" diff --git a/vendor/modules.txt b/vendor/modules.txt index 324644a5de3fb..d5e0977fc84f5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -465,7 +465,7 @@ github.com/aws/smithy-go/transport/http/internal/io # github.com/axiomhq/hyperloglog v0.2.0 ## explicit; go 1.21 github.com/axiomhq/hyperloglog -# github.com/baidubce/bce-sdk-go v0.9.206 +# github.com/baidubce/bce-sdk-go v0.9.208 ## explicit; go 1.11 github.com/baidubce/bce-sdk-go/auth github.com/baidubce/bce-sdk-go/bce