From 141fe2aa38f42a8d00a8d19d215770918cdc36eb Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 29 Jan 2025 16:08:07 -0500 Subject: [PATCH 1/2] block-builder: Add cortex_blockbuilder_blocks_produced_total metric Signed-off-by: Ganesh Vernekar --- pkg/blockbuilder/blockbuilder.go | 53 +++++++++++++++++---------- pkg/blockbuilder/blockbuilder_test.go | 44 ++++++++++++++++++++++ pkg/blockbuilder/metrics.go | 9 +++++ pkg/blockbuilder/tsdb.go | 19 ++++------ pkg/blockbuilder/tsdb_test.go | 8 ++-- 5 files changed, 99 insertions(+), 34 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 34b10711950..e47520a108f 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -18,6 +18,7 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" @@ -74,10 +75,6 @@ func newWithSchedulerClient( limits *validation.Overrides, schedulerClient schedulerpb.SchedulerClient, ) (*BlockBuilder, error) { - if cfg.NoPartiallyConsumedRegion { - // We should not have a large buffer if we are putting all the records into a block. - cfg.ConsumeIntervalBuffer = 5 * time.Minute - } b := &BlockBuilder{ cfg: cfg, @@ -489,7 +486,7 @@ func (b *BlockBuilder) consumePartitionSection( return state, nil } - var numBlocks int + var blockMetas []tsdb.BlockMeta defer func(t time.Time, startState PartitionState) { // No need to log or track time of the unfinished section. Just bail out. if errors.Is(retErr, context.Canceled) { @@ -507,7 +504,7 @@ func (b *BlockBuilder) consumePartitionSection( level.Info(logger).Log("msg", "done consuming", "duration", dur, "last_block_end", startState.LastBlockEnd, "curr_block_end", blockEnd, "last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset, - "num_blocks", numBlocks) + "num_blocks", len(blockMetas)) }(time.Now(), state) // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). @@ -610,11 +607,16 @@ consumerLoop: } var err error - numBlocks, err = builder.CompactAndUpload(ctx, b.uploadBlocks) + blockMetas, err = builder.CompactAndUpload(ctx, b.uploadBlocks) if err != nil { return state, err } + prev, curr, next := getBlockCategoryCount(sectionEndTime, blockMetas) + b.blockBuilderMetrics.blockCounts.WithLabelValues("previous").Add(float64(prev)) + b.blockBuilderMetrics.blockCounts.WithLabelValues("current").Add(float64(curr)) + b.blockBuilderMetrics.blockCounts.WithLabelValues("next").Add(float64(next)) + // We should take the max of last seen offsets. If the partition was lagging due to some record not being processed // because of a future sample, we might be coming back to the same consume cycle again. lastSeenOffset := max(lastRec.Offset, state.LastSeenOffset) @@ -643,6 +645,22 @@ consumerLoop: return newState, nil } +func getBlockCategoryCount(sectionEndTime time.Time, blockMetas []tsdb.BlockMeta) (prev, curr, next int) { + // Doing -30m will take care + currHour := sectionEndTime.Add(-30 * time.Minute).Truncate(2 * time.Hour).Hour() + for _, m := range blockMetas { + hour := time.UnixMilli(m.MinTime/2 + m.MaxTime/2).Truncate(2 * time.Hour).Hour() + if hour < currHour { + prev++ + } else if hour > currHour { + next++ + } else { + curr++ + } + } + return +} + type stateCommitter interface { commitState(context.Context, *BlockBuilder, log.Logger, string, PartitionState) error } @@ -684,21 +702,18 @@ func (c *noOpCommitter) commitState(_ context.Context, _ *BlockBuilder, _ log.Lo var _ stateCommitter = &noOpCommitter{} -func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, blockIDs []string) error { +func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error { buc := bucket.NewUserBucketClient(tenantID, b.bucket, b.limits) - for _, bid := range blockIDs { - blockDir := path.Join(dbDir, bid) - meta, err := block.ReadMetaFromDir(blockDir) - if err != nil { - return fmt.Errorf("read block meta for block %s (tenant %s): %w", bid, tenantID, err) - } - - if meta.Stats.NumSamples == 0 { + for _, m := range metas { + if m.Stats.NumSamples == 0 { // No need to upload empty block. - level.Info(b.logger).Log("msg", "skip uploading empty block", "tenant", tenantID, "block", bid) + level.Info(b.logger).Log("msg", "skip uploading empty block", "tenant", tenantID, "block", m.ULID.String()) return nil } + meta := &block.Meta{BlockMeta: m} + blockDir := path.Join(dbDir, meta.ULID.String()) + meta.Thanos.Source = block.BlockBuilderSource meta.Thanos.SegmentFiles = block.GetSegmentFiles(blockDir) @@ -717,11 +732,11 @@ func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, if err == nil { break } - level.Warn(b.logger).Log("msg", "failed to upload block; will retry", "err", err, "block", bid, "tenant", tenantID) + level.Warn(b.logger).Log("msg", "failed to upload block; will retry", "err", err, "block", meta.ULID.String(), "tenant", tenantID) boff.Wait() } if err := boff.ErrCause(); err != nil { - return fmt.Errorf("upload block %s (tenant %s): %w", bid, tenantID, err) + return fmt.Errorf("upload block %s (tenant %s): %w", meta.ULID.String(), tenantID, err) } } return nil diff --git a/pkg/blockbuilder/blockbuilder_test.go b/pkg/blockbuilder/blockbuilder_test.go index f2026fd2f5b..9684d10baa8 100644 --- a/pkg/blockbuilder/blockbuilder_test.go +++ b/pkg/blockbuilder/blockbuilder_test.go @@ -1212,6 +1212,50 @@ func TestNoPartiallyConsumedRegions(t *testing.T) { require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset)) } +// This is a temporary test for quick iteration on the new way of consuming partition. +// TODO: integrate it with other tests. +func TestGetBlockCategoryCount(t *testing.T) { + evenHrBounary := time.UnixMilli(10 * time.Hour.Milliseconds()) + oddHrBounary := time.UnixMilli(9 * time.Hour.Milliseconds()) + cases := []struct { + sectionEndTime time.Time + metas []tsdb.BlockMeta + prev, curr, next int + }{ + { + sectionEndTime: evenHrBounary, + metas: []tsdb.BlockMeta{ + {MinTime: evenHrBounary.Add(-4 * time.Hour).UnixMilli(), MaxTime: evenHrBounary.Add(-2 * time.Hour).UnixMilli()}, + {MinTime: evenHrBounary.Add(-2 * time.Hour).UnixMilli(), MaxTime: evenHrBounary.UnixMilli()}, + {MinTime: evenHrBounary.UnixMilli(), MaxTime: evenHrBounary.Add(5 * time.Minute).UnixMilli()}, + }, + prev: 1, curr: 1, next: 1, + }, + { + sectionEndTime: oddHrBounary, + metas: []tsdb.BlockMeta{ + {MinTime: oddHrBounary.Add(-3 * time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(-1 * time.Hour).UnixMilli()}, + {MinTime: oddHrBounary.Add(-1 * time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(time.Hour).UnixMilli()}, + // Although this is after the sectionEndTime, it is still the same 2h block. + {MinTime: oddHrBounary.UnixMilli(), MaxTime: oddHrBounary.Add(time.Hour).UnixMilli()}, + {MinTime: oddHrBounary.Add(time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(2 * time.Hour).UnixMilli()}, + }, + prev: 1, curr: 2, next: 1, + }, + } + for i, c := range cases { + // Buffer to add to sectionEndTime. + for _, buffer := range []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute} { + t.Run(fmt.Sprintf("%d,buffer=%s", i, buffer.String()), func(t *testing.T) { + prev, curr, next := getBlockCategoryCount(c.sectionEndTime.Add(buffer), c.metas) + require.Equal(t, c.prev, prev) + require.Equal(t, c.curr, curr) + require.Equal(t, c.next, next) + }) + } + } +} + func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) { cfg, overrides := blockBuilderConfig(t, addr) cfg.SchedulerConfig = SchedulerConfig{ diff --git a/pkg/blockbuilder/metrics.go b/pkg/blockbuilder/metrics.go index d9db46c6a4f..53299261d8f 100644 --- a/pkg/blockbuilder/metrics.go +++ b/pkg/blockbuilder/metrics.go @@ -12,6 +12,7 @@ type blockBuilderMetrics struct { processPartitionDuration *prometheus.HistogramVec fetchErrors *prometheus.CounterVec consumerLagRecords *prometheus.GaugeVec + blockCounts *prometheus.CounterVec } func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics { @@ -40,6 +41,14 @@ func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics { Help: "The per-topic-partition number of records, instance needs to work through each cycle.", }, []string{"partition"}) + // block_time can be "next", "current" or "previous". + // If the block belongs to the current 2h block range, it goes in "current". + // "next" or "previous" are used for the blocks that are not in the current 2h block range. + m.blockCounts = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_blockbuilder_blocks_produced_total", + Help: "Total number of blocks produced for specific block ranges (next, current, previous).", + }, []string{"block_time"}) + return m } diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index c0bbbf84066..03937af48f3 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -22,7 +22,6 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/twmb/franz-go/pkg/kgo" - "go.uber.org/atomic" "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/mimirpb" @@ -311,12 +310,12 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) { } // Function to upload the blocks. -type blockUploader func(_ context.Context, tenantID, dbDir string, blockIDs []string) error +type blockUploader func(_ context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error // CompactAndUpload compacts the blocks of all the TSDBs and uploads them. // uploadBlocks is a function that uploads the blocks to the required storage. // All the DBs are closed and directories cleared irrespective of success or failure of this function. -func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error) { +func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (metas []tsdb.BlockMeta, err error) { var ( closedDBsMu sync.Mutex closedDBs = make(map[*userTSDB]bool) @@ -343,11 +342,9 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp level.Info(b.logger).Log("msg", "compacting and uploading blocks", "num_tsdb", len(b.tsdbs)) if len(b.tsdbs) == 0 { - return 0, nil + return nil, nil } - numBlocks := atomic.NewInt64(0) - eg, ctx := errgroup.WithContext(ctx) if b.blocksStorageCfg.TSDB.ShipConcurrency > 0 { eg.SetLimit(b.blocksStorageCfg.TSDB.ShipConcurrency) @@ -371,12 +368,12 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp return err } - var blockIDs []string dbDir := db.Dir() + var localMetas []tsdb.BlockMeta for _, b := range db.Blocks() { - blockIDs = append(blockIDs, b.Meta().ULID.String()) + localMetas = append(localMetas, b.Meta()) } - numBlocks.Add(int64(len(blockIDs))) + metas = append(metas, localMetas...) if err := db.Close(); err != nil { return err @@ -386,7 +383,7 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp closedDBs[db] = true closedDBsMu.Unlock() - if err := uploadBlocks(ctx, tenant.tenantID, dbDir, blockIDs); err != nil { + if err := uploadBlocks(ctx, tenant.tenantID, dbDir, localMetas); err != nil { return err } @@ -395,7 +392,7 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp }) } err = eg.Wait() - return int(numBlocks.Load()), err + return metas, err } // Close closes all DBs and deletes their data directories. diff --git a/pkg/blockbuilder/tsdb_test.go b/pkg/blockbuilder/tsdb_test.go index 870daa90c15..f6d67cf43e1 100644 --- a/pkg/blockbuilder/tsdb_test.go +++ b/pkg/blockbuilder/tsdb_test.go @@ -299,7 +299,7 @@ func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) { require.NoError(t, err) errUploadFailed := fmt.Errorf("upload failed") - _, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []string) error { + _, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []tsdb.BlockMeta) error { return errUploadFailed }) require.ErrorIs(t, err, errUploadFailed) @@ -364,9 +364,9 @@ func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHis } func mockUploaderFunc(t *testing.T, destDir string) blockUploader { - return func(_ context.Context, _, dbDir string, blockIDs []string) error { - for _, bid := range blockIDs { - blockDir := path.Join(dbDir, bid) + return func(_ context.Context, _, dbDir string, metas []tsdb.BlockMeta) error { + for _, meta := range metas { + blockDir := path.Join(dbDir, meta.ULID.String()) err := os.Rename(blockDir, path.Join(destDir, path.Base(blockDir))) require.NoError(t, err) } From 0fd58db3d65de9de3720f41e4b06f56e3cb2d085 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 30 Jan 2025 13:20:17 -0500 Subject: [PATCH 2/2] Fix tests Signed-off-by: Ganesh Vernekar --- pkg/blockbuilder/blockbuilder_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/blockbuilder/blockbuilder_test.go b/pkg/blockbuilder/blockbuilder_test.go index 9684d10baa8..760efced4ac 100644 --- a/pkg/blockbuilder/blockbuilder_test.go +++ b/pkg/blockbuilder/blockbuilder_test.go @@ -1144,6 +1144,7 @@ func TestNoPartiallyConsumedRegions(t *testing.T) { cfg, overrides := blockBuilderConfig(t, kafkaAddr) cfg.NoPartiallyConsumedRegion = true + cfg.ConsumeIntervalBuffer = 5 * time.Minute // Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle. kafkaCommits := atomic.NewInt32(0) @@ -1155,9 +1156,6 @@ func TestNoPartiallyConsumedRegions(t *testing.T) { bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides) require.NoError(t, err) - // NoPartiallyConsumedRegion changes the buffer to 5 mins. - require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer) - require.NoError(t, bb.starting(ctx)) t.Cleanup(func() { require.NoError(t, bb.stoppingStandaloneMode(nil))