Skip to content

Commit

Permalink
feat(block-scheduler): introduce job lease and requeue expired jobs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Dec 30, 2024
1 parent 61df085 commit 558080c
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 23 deletions.
11 changes: 11 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ block_scheduler:
# CLI flag: -block-scheduler.max-jobs-planned-per-interval
[max_jobs_planned_per_interval: <int> | default = 100]

job_queue:
# Interval to check for expired job leases
# CLI flag: -jobqueue.lease-expiry-check-interval
[lease_expiry_check_interval: <duration> | default = 1m]

# Duration after which a job lease is considered expired if the scheduler
# receives no updates from builders about the job. Expired jobs are
# re-enqueued
# CLI flag: -jobqueue.lease-duration
[lease_duration: <duration> | default = 10m]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
91 changes: 83 additions & 8 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package scheduler

import (
"context"
"errors"
"flag"
"fmt"
"sync"
"time"
Expand All @@ -21,6 +24,7 @@ const (
// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle
type JobWithMetadata struct {
*types.Job

Priority int
Status types.JobStatus
StartTime time.Time
Expand Down Expand Up @@ -60,21 +64,36 @@ func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics {
}
}

type JobQueueConfig struct {
LeaseExpiryCheckInterval time.Duration `yaml:"lease_expiry_check_interval"`
LeaseDuration time.Duration `yaml:"lease_duration"`
}

func (cfg *JobQueueConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.LeaseExpiryCheckInterval, "jobqueue.lease-expiry-check-interval", 1*time.Minute, "Interval to check for expired job leases")
f.DurationVar(&cfg.LeaseDuration, "jobqueue.lease-duration", 10*time.Minute, "Duration after which a job lease is considered expired if the scheduler receives no updates from builders about the job. Expired jobs are re-enqueued")
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
logger log.Logger
cfg JobQueueConfig

mu sync.RWMutex
pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*JobWithMetadata // Jobs currently being processed
completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
metrics *jobQueueMetrics
mu sync.RWMutex

logger log.Logger
metrics *jobQueueMetrics
}

// NewJobQueue creates a new job queue instance
func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
func NewJobQueue(cfg JobQueueConfig, logger log.Logger, reg prometheus.Registerer) *JobQueue {
return &JobQueue{
cfg: cfg,
logger: logger,

pending: NewPriorityQueue(
func(a, b *JobWithMetadata) bool {
return a.Priority > b.Priority // Higher priority first
Expand All @@ -88,6 +107,49 @@ func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
}
}

func (q *JobQueue) RunLeaseExpiryChecker(ctx context.Context) {
ticker := time.NewTicker(q.cfg.LeaseExpiryCheckInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
level.Debug(q.logger).Log("msg", "checking for expired job leases")
if err := q.requeueExpiredJobs(); err != nil {
level.Error(q.logger).Log("msg", "failed to requeue expired jobs", "err", err)
}
case <-ctx.Done():
return
}
}
}

func (q *JobQueue) requeueExpiredJobs() error {
q.mu.Lock()
defer q.mu.Unlock()

var multiErr error
for id, job := range q.inProgress {
if time.Since(job.UpdateTime) > q.cfg.LeaseDuration {
level.Warn(q.logger).Log("msg", "job lease expired. requeuing", "job", id, "update_time", job.UpdateTime, "now", time.Now())

// complete the job with expired status and re-enqueue
delete(q.inProgress, id)
q.metrics.inProgress.Dec()

job.Status = types.JobStatusExpired
q.addToCompletedBuffer(job)

if err := q.enqueueLockLess(job.Job, job.Priority); err != nil {
level.Error(q.logger).Log("msg", "failed to requeue expired job", "job", id, "err", err)
multiErr = errors.Join(multiErr, err)
}
}
}

return multiErr
}

// Exists checks if a job exists in any state and returns its status
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
Expand Down Expand Up @@ -126,8 +188,12 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
q.mu.Lock()
defer q.mu.Unlock()

return q.enqueueLockLess(job, priority)
}

func (q *JobQueue) enqueueLockLess(job *types.Job, priority int) error {
// Check if job already exists
if status, exists := q.statusMap[job.ID()]; exists {
if status, exists := q.statusMap[job.ID()]; exists && status != types.JobStatusExpired {
return fmt.Errorf("job %s already exists with status %v", job.ID(), status)
}

Expand Down Expand Up @@ -203,20 +269,29 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id)
}
q.metrics.pending.Dec()
case types.JobStatusComplete:
level.Info(q.logger).Log("msg", "job is already complete, ignoring", "job", id)
return
default:
level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status)
return
}

jobMeta.Status = status
jobMeta.UpdateTime = time.Now()

// add it to the completed buffer, removing any evicted job from the statusMap
q.addToCompletedBuffer(jobMeta)
}

// add it to the completed buffer, removing any evicted job from the statusMap
func (q *JobQueue) addToCompletedBuffer(jobMeta *JobWithMetadata) {
removal, evicted := q.completed.Push(jobMeta)
if evicted {
delete(q.statusMap, removal.ID())
}
q.statusMap[id] = status
q.metrics.completed.WithLabelValues(status.String()).Inc()

q.statusMap[jobMeta.ID()] = jobMeta.Status
q.metrics.completed.WithLabelValues(jobMeta.Status.String()).Inc()
}

// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress
Expand Down
121 changes: 114 additions & 7 deletions pkg/blockbuilder/scheduler/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

var testQueueCfg = JobQueueConfig{}

func TestJobQueue_SyncJob(t *testing.T) {
t.Run("non-existent to in-progress", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
jobID := job.ID()

Expand All @@ -29,7 +31,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
})

t.Run("pending to in-progress", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with pending job
Expand All @@ -52,7 +54,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
})

t.Run("already in-progress", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// First sync to put in in-progress
Expand All @@ -73,7 +75,7 @@ func TestJobQueue_SyncJob(t *testing.T) {

func TestJobQueue_MarkComplete(t *testing.T) {
t.Run("in-progress to complete", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with in-progress job
Expand Down Expand Up @@ -103,7 +105,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("pending to complete", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with pending job
Expand All @@ -130,7 +132,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("non-existent job", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger

Expand All @@ -139,7 +141,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("already completed job", func(t *testing.T) {
q := NewJobQueue(log.NewNopLogger(), nil)
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger

Expand All @@ -153,6 +155,111 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})
}

func TestJobQueue_Enqueue(t *testing.T) {
q := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)

job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

beforeComplete := time.Now()
err := q.Enqueue(job, 1)
afterComplete := time.Now()
require.NoError(t, err)

status, ok := q.Exists(job)
require.True(t, ok, "job should exist")
require.Equal(t, types.JobStatusPending, status)

// Verify job in pending queue
foundJob, ok := q.pending.Lookup(job.ID())
require.True(t, ok, "job should be in pending queue")
require.Equal(t, job, foundJob.Job)
require.Equal(t, 1, foundJob.Priority)
require.True(t, foundJob.StartTime.IsZero())
require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete))
require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete))

// allow enqueueing of job with same ID if expired
job2 := types.NewJob(2, types.Offsets{Min: 100, Max: 200})
q.statusMap[job2.ID()] = types.JobStatusExpired

err = q.Enqueue(job2, 2)
require.NoError(t, err)

status, ok = q.Exists(job2)
require.True(t, ok, "job should exist")
require.Equal(t, types.JobStatusPending, status)

// Verify job2 in pending queue
foundJob, ok = q.pending.Lookup(job2.ID())
require.True(t, ok, "job2 should be in pending queue")
require.Equal(t, job2, foundJob.Job)
require.Equal(t, 2, foundJob.Priority)

// do not allow enqueueing of job with same ID if not expired
job3 := types.NewJob(3, types.Offsets{Min: 120, Max: 230})
q.statusMap[job3.ID()] = types.JobStatusInProgress

err = q.Enqueue(job3, DefaultPriority)
require.Error(t, err)
}

func TestJobQueue_RequeueExpiredJobs(t *testing.T) {
q := NewJobQueue(JobQueueConfig{
LeaseDuration: 5 * time.Minute,
}, log.NewNopLogger(), nil)

job1 := &JobWithMetadata{
Job: types.NewJob(1, types.Offsets{Min: 100, Max: 200}),
Priority: 1,
Status: types.JobStatusInProgress,
StartTime: time.Now().Add(-time.Hour),
UpdateTime: time.Now().Add(-time.Minute),
}
// expired job
job2 := &JobWithMetadata{
Job: types.NewJob(2, types.Offsets{Min: 300, Max: 400}),
Priority: 2,
Status: types.JobStatusInProgress,
StartTime: time.Now().Add(-time.Hour),
UpdateTime: time.Now().Add(-6 * time.Minute),
}

q.inProgress[job1.ID()] = job1
q.inProgress[job2.ID()] = job2
q.statusMap[job1.ID()] = types.JobStatusInProgress
q.statusMap[job2.ID()] = types.JobStatusInProgress

beforeRequeue := time.Now()
err := q.requeueExpiredJobs()
require.NoError(t, err)

status, ok := q.statusMap[job1.ID()]
require.True(t, ok)
require.Equal(t, types.JobStatusInProgress, status)

got, ok := q.inProgress[job1.ID()]
require.True(t, ok)
require.Equal(t, job1, got)

status, ok = q.statusMap[job2.ID()]
require.True(t, ok)
require.Equal(t, types.JobStatusPending, status)

got, ok = q.pending.Lookup(job2.ID())
require.True(t, ok)
require.Equal(t, job2.Job, got.Job)
require.Equal(t, types.JobStatusPending, got.Status)
require.Equal(t, job2.Priority, got.Priority)
require.True(t, got.StartTime.IsZero())
require.True(t, got.UpdateTime.After(beforeRequeue) || got.UpdateTime.Equal(beforeRequeue))

require.Equal(t, 1, q.completed.Len())
got, ok = q.completed.Pop()
require.True(t, ok)
job2.Status = types.JobStatusExpired
require.Equal(t, job2, got)
}

// testLogger implements log.Logger for testing
type testLogger struct {
t *testing.T
Expand Down
16 changes: 10 additions & 6 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
TargetRecordCount int64 `yaml:"target_record_count"`
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
TargetRecordCount int64 `yaml:"target_record_count"`
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
JobQueueConfig JobQueueConfig `yaml:"job_queue"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand Down Expand Up @@ -61,6 +62,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
100,
"Maximum number of jobs that the planner can return.",
)
cfg.JobQueueConfig.RegisterFlags(f)
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -128,6 +130,8 @@ func (s *BlockScheduler) running(ctx context.Context) error {
level.Error(s.logger).Log("msg", "failed to schedule jobs", "err", err)
}

go s.queue.RunLeaseExpiryChecker(ctx)

ticker := time.NewTicker(s.cfg.Interval)
for {
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
}

func newTestEnv(builderID string) (*testEnv, error) {
queue := NewJobQueue(log.NewNopLogger(), nil)
queue := NewJobQueue(testQueueCfg, log.NewNopLogger(), nil)
mockOffsetMgr := &mockOffsetManager{
topic: "test-topic",
consumerGroup: "test-group",
Expand Down
Loading

0 comments on commit 558080c

Please sign in to comment.