From 558080cf78d64bfbd8b7542bb1ba6bee4c399a59 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Tue, 31 Dec 2024 00:07:19 +0530 Subject: [PATCH] feat(block-scheduler): introduce job lease and requeue expired jobs (#15560) --- docs/sources/shared/configuration.md | 11 ++ pkg/blockbuilder/scheduler/queue.go | 91 ++++++++++++-- pkg/blockbuilder/scheduler/queue_test.go | 121 +++++++++++++++++-- pkg/blockbuilder/scheduler/scheduler.go | 16 ++- pkg/blockbuilder/scheduler/scheduler_test.go | 2 +- pkg/loki/modules.go | 2 +- 6 files changed, 220 insertions(+), 23 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index bd9fe20c490a5..763a7ad905b42 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -236,6 +236,17 @@ block_scheduler: # CLI flag: -block-scheduler.max-jobs-planned-per-interval [max_jobs_planned_per_interval: | default = 100] + job_queue: + # Interval to check for expired job leases + # CLI flag: -jobqueue.lease-expiry-check-interval + [lease_expiry_check_interval: | default = 1m] + + # Duration after which a job lease is considered expired if the scheduler + # receives no updates from builders about the job. Expired jobs are + # re-enqueued + # CLI flag: -jobqueue.lease-duration + [lease_duration: | default = 10m] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 4793794576075..5169d0ffc1400 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -1,6 +1,9 @@ package scheduler import ( + "context" + "errors" + "flag" "fmt" "sync" "time" @@ -21,6 +24,7 @@ const ( // JobWithMetadata wraps a job with additional metadata for tracking its lifecycle type JobWithMetadata struct { *types.Job + Priority int Status types.JobStatus StartTime time.Time @@ -60,21 +64,36 @@ func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics { } } +type JobQueueConfig struct { + LeaseExpiryCheckInterval time.Duration `yaml:"lease_expiry_check_interval"` + LeaseDuration time.Duration `yaml:"lease_duration"` +} + +func (cfg *JobQueueConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.LeaseExpiryCheckInterval, "jobqueue.lease-expiry-check-interval", 1*time.Minute, "Interval to check for expired job leases") + f.DurationVar(&cfg.LeaseDuration, "jobqueue.lease-duration", 10*time.Minute, "Duration after which a job lease is considered expired if the scheduler receives no updates from builders about the job. Expired jobs are re-enqueued") +} + // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { - logger log.Logger + cfg JobQueueConfig + + mu sync.RWMutex pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority inProgress map[string]*JobWithMetadata // Jobs currently being processed completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs statusMap map[string]types.JobStatus // Maps job ID to its current status - metrics *jobQueueMetrics - mu sync.RWMutex + + logger log.Logger + metrics *jobQueueMetrics } // NewJobQueue creates a new job queue instance -func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue { +func NewJobQueue(cfg JobQueueConfig, logger log.Logger, reg prometheus.Registerer) *JobQueue { return &JobQueue{ + cfg: cfg, logger: logger, + pending: NewPriorityQueue( func(a, b *JobWithMetadata) bool { return a.Priority > b.Priority // Higher priority first @@ -88,6 +107,49 @@ func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue { } } +func (q *JobQueue) RunLeaseExpiryChecker(ctx context.Context) { + ticker := time.NewTicker(q.cfg.LeaseExpiryCheckInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + level.Debug(q.logger).Log("msg", "checking for expired job leases") + if err := q.requeueExpiredJobs(); err != nil { + level.Error(q.logger).Log("msg", "failed to requeue expired jobs", "err", err) + } + case <-ctx.Done(): + return + } + } +} + +func (q *JobQueue) requeueExpiredJobs() error { + q.mu.Lock() + defer q.mu.Unlock() + + var multiErr error + for id, job := range q.inProgress { + if time.Since(job.UpdateTime) > q.cfg.LeaseDuration { + level.Warn(q.logger).Log("msg", "job lease expired. requeuing", "job", id, "update_time", job.UpdateTime, "now", time.Now()) + + // complete the job with expired status and re-enqueue + delete(q.inProgress, id) + q.metrics.inProgress.Dec() + + job.Status = types.JobStatusExpired + q.addToCompletedBuffer(job) + + if err := q.enqueueLockLess(job.Job, job.Priority); err != nil { + level.Error(q.logger).Log("msg", "failed to requeue expired job", "job", id, "err", err) + multiErr = errors.Join(multiErr, err) + } + } + } + + return multiErr +} + // Exists checks if a job exists in any state and returns its status func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() @@ -126,8 +188,12 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error { q.mu.Lock() defer q.mu.Unlock() + return q.enqueueLockLess(job, priority) +} + +func (q *JobQueue) enqueueLockLess(job *types.Job, priority int) error { // Check if job already exists - if status, exists := q.statusMap[job.ID()]; exists { + if status, exists := q.statusMap[job.ID()]; exists && status != types.JobStatusExpired { return fmt.Errorf("job %s already exists with status %v", job.ID(), status) } @@ -203,20 +269,29 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id) } q.metrics.pending.Dec() + case types.JobStatusComplete: + level.Info(q.logger).Log("msg", "job is already complete, ignoring", "job", id) + return default: level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status) + return } jobMeta.Status = status jobMeta.UpdateTime = time.Now() - // add it to the completed buffer, removing any evicted job from the statusMap + q.addToCompletedBuffer(jobMeta) +} + +// add it to the completed buffer, removing any evicted job from the statusMap +func (q *JobQueue) addToCompletedBuffer(jobMeta *JobWithMetadata) { removal, evicted := q.completed.Push(jobMeta) if evicted { delete(q.statusMap, removal.ID()) } - q.statusMap[id] = status - q.metrics.completed.WithLabelValues(status.String()).Inc() + + q.statusMap[jobMeta.ID()] = jobMeta.Status + q.metrics.completed.WithLabelValues(jobMeta.Status.String()).Inc() } // SyncJob registers a job as in-progress or updates its UpdateTime if already in progress diff --git a/pkg/blockbuilder/scheduler/queue_test.go b/pkg/blockbuilder/scheduler/queue_test.go index cf86ec29f8941..7a0ec1f059bda 100644 --- a/pkg/blockbuilder/scheduler/queue_test.go +++ b/pkg/blockbuilder/scheduler/queue_test.go @@ -10,9 +10,11 @@ import ( "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) +var testQueueCfg = JobQueueConfig{} + func TestJobQueue_SyncJob(t *testing.T) { t.Run("non-existent to in-progress", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) jobID := job.ID() @@ -29,7 +31,7 @@ func TestJobQueue_SyncJob(t *testing.T) { }) t.Run("pending to in-progress", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with pending job @@ -52,7 +54,7 @@ func TestJobQueue_SyncJob(t *testing.T) { }) t.Run("already in-progress", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // First sync to put in in-progress @@ -73,7 +75,7 @@ func TestJobQueue_SyncJob(t *testing.T) { func TestJobQueue_MarkComplete(t *testing.T) { t.Run("in-progress to complete", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with in-progress job @@ -103,7 +105,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("pending to complete", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with pending job @@ -130,7 +132,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("non-existent job", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) logger := &testLogger{t: t} q.logger = logger @@ -139,7 +141,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("already completed job", func(t *testing.T) { - q := NewJobQueue(log.NewNopLogger(), nil) + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) logger := &testLogger{t: t} q.logger = logger @@ -153,6 +155,111 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) } +func TestJobQueue_Enqueue(t *testing.T) { + q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) + + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + beforeComplete := time.Now() + err := q.Enqueue(job, 1) + afterComplete := time.Now() + require.NoError(t, err) + + status, ok := q.Exists(job) + require.True(t, ok, "job should exist") + require.Equal(t, types.JobStatusPending, status) + + // Verify job in pending queue + foundJob, ok := q.pending.Lookup(job.ID()) + require.True(t, ok, "job should be in pending queue") + require.Equal(t, job, foundJob.Job) + require.Equal(t, 1, foundJob.Priority) + require.True(t, foundJob.StartTime.IsZero()) + require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete)) + require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete)) + + // allow enqueueing of job with same ID if expired + job2 := types.NewJob(2, types.Offsets{Min: 100, Max: 200}) + q.statusMap[job2.ID()] = types.JobStatusExpired + + err = q.Enqueue(job2, 2) + require.NoError(t, err) + + status, ok = q.Exists(job2) + require.True(t, ok, "job should exist") + require.Equal(t, types.JobStatusPending, status) + + // Verify job2 in pending queue + foundJob, ok = q.pending.Lookup(job2.ID()) + require.True(t, ok, "job2 should be in pending queue") + require.Equal(t, job2, foundJob.Job) + require.Equal(t, 2, foundJob.Priority) + + // do not allow enqueueing of job with same ID if not expired + job3 := types.NewJob(3, types.Offsets{Min: 120, Max: 230}) + q.statusMap[job3.ID()] = types.JobStatusInProgress + + err = q.Enqueue(job3, DefaultPriority) + require.Error(t, err) +} + +func TestJobQueue_RequeueExpiredJobs(t *testing.T) { + q := NewJobQueue(JobQueueConfig{ + LeaseDuration: 5 * time.Minute, + }, log.NewNopLogger(), nil) + + job1 := &JobWithMetadata{ + Job: types.NewJob(1, types.Offsets{Min: 100, Max: 200}), + Priority: 1, + Status: types.JobStatusInProgress, + StartTime: time.Now().Add(-time.Hour), + UpdateTime: time.Now().Add(-time.Minute), + } + // expired job + job2 := &JobWithMetadata{ + Job: types.NewJob(2, types.Offsets{Min: 300, Max: 400}), + Priority: 2, + Status: types.JobStatusInProgress, + StartTime: time.Now().Add(-time.Hour), + UpdateTime: time.Now().Add(-6 * time.Minute), + } + + q.inProgress[job1.ID()] = job1 + q.inProgress[job2.ID()] = job2 + q.statusMap[job1.ID()] = types.JobStatusInProgress + q.statusMap[job2.ID()] = types.JobStatusInProgress + + beforeRequeue := time.Now() + err := q.requeueExpiredJobs() + require.NoError(t, err) + + status, ok := q.statusMap[job1.ID()] + require.True(t, ok) + require.Equal(t, types.JobStatusInProgress, status) + + got, ok := q.inProgress[job1.ID()] + require.True(t, ok) + require.Equal(t, job1, got) + + status, ok = q.statusMap[job2.ID()] + require.True(t, ok) + require.Equal(t, types.JobStatusPending, status) + + got, ok = q.pending.Lookup(job2.ID()) + require.True(t, ok) + require.Equal(t, job2.Job, got.Job) + require.Equal(t, types.JobStatusPending, got.Status) + require.Equal(t, job2.Priority, got.Priority) + require.True(t, got.StartTime.IsZero()) + require.True(t, got.UpdateTime.After(beforeRequeue) || got.UpdateTime.Equal(beforeRequeue)) + + require.Equal(t, 1, q.completed.Len()) + got, ok = q.completed.Pop() + require.True(t, ok) + job2.Status = types.JobStatusExpired + require.Equal(t, job2, got) +} + // testLogger implements log.Logger for testing type testLogger struct { t *testing.T diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index fb6130c3fce13..1ea28970f923f 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -25,12 +25,13 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` - Interval time.Duration `yaml:"interval"` - LookbackPeriod time.Duration `yaml:"lookback_period"` - Strategy string `yaml:"strategy"` - TargetRecordCount int64 `yaml:"target_record_count"` - MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"` + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + LookbackPeriod time.Duration `yaml:"lookback_period"` + Strategy string `yaml:"strategy"` + TargetRecordCount int64 `yaml:"target_record_count"` + MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"` + JobQueueConfig JobQueueConfig `yaml:"job_queue"` } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -61,6 +62,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { 100, "Maximum number of jobs that the planner can return.", ) + cfg.JobQueueConfig.RegisterFlags(f) } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -128,6 +130,8 @@ func (s *BlockScheduler) running(ctx context.Context) error { level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err) } + go s.queue.RunLeaseExpiryChecker(ctx) + ticker := time.NewTicker(s.cfg.Interval) for { select { diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 1f5654312c144..55234fbd4d9f0 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error { } func newTestEnv(builderID string) (*testEnv, error) { - queue := NewJobQueue(log.NewNopLogger(), nil) + queue := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil) mockOffsetMgr := &mockOffsetManager{ topic: "test-topic", consumerGroup: "test-group", diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index fa1b9c7ea3a8f..bdd498b32a2d4 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1869,7 +1869,7 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { s, err := blockscheduler.NewScheduler( t.Cfg.BlockScheduler, - blockscheduler.NewJobQueue(logger, prometheus.DefaultRegisterer), + blockscheduler.NewJobQueue(t.Cfg.BlockScheduler.JobQueueConfig, logger, prometheus.DefaultRegisterer), offsetManager, logger, prometheus.DefaultRegisterer,