diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 91b560e106c87..82b5294daa191 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -210,10 +210,6 @@ block_builder: [scheduler_grpc_client_config: ] block_scheduler: - # Consumer group used by block scheduler to track the last consumed offset. - # CLI flag: -block-scheduler.consumer-group - [consumer_group: | default = "block-scheduler"] - # How often the scheduler should plan jobs. # CLI flag: -block-scheduler.interval [interval: | default = 15m] diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index 61ff6ffcbc782..018ccc1365441 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -279,6 +279,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin completion.Success = false } + // remove from inflight jobs to stop sending sync requests + i.jobsMtx.Lock() + delete(i.inflightJobs, job.ID()) + i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) + i.jobsMtx.Unlock() + if _, err := withBackoff( ctx, i.cfg.Backoff, @@ -292,16 +298,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin return true, err } - i.jobsMtx.Lock() - delete(i.inflightJobs, job.ID()) - i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) - i.jobsMtx.Unlock() - return true, err } func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) { level.Debug(logger).Log("msg", "beginning job") + start := time.Now() indexer := newTsdbCreator() appender := newAppender(i.id, @@ -505,6 +507,8 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types level.Info(logger).Log( "msg", "successfully processed job", "last_offset", lastOffset, + "duration", time.Since(start), + "records", lastOffset-job.Offsets().Min, ) return lastOffset, nil diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 019ca61a0c487..4758561fd6c2b 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -332,6 +332,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { case types.JobStatusInProgress: case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: // Job already completed, re-enqueue a new one + level.Warn(q.logger).Log("msg", "job already completed, re-enqueuing", "job", jobID, "status", jobMeta.Status) registerInProgress() return default: diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index d4e8c0935ce68..55ecc066dc582 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -26,7 +26,6 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` Interval time.Duration `yaml:"interval"` LookbackPeriod time.Duration `yaml:"lookback_period"` Strategy string `yaml:"strategy"` @@ -36,7 +35,6 @@ type Config struct { func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.") - f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.") f.StringVar( &cfg.Strategy,