Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

block-builder: Add cortex_blockbuilder_blocks_produced_total metric #10538

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -74,10 +75,6 @@ func newWithSchedulerClient(
limits *validation.Overrides,
schedulerClient schedulerpb.SchedulerClient,
) (*BlockBuilder, error) {
if cfg.NoPartiallyConsumedRegion {
// We should not have a large buffer if we are putting all the records into a block.
cfg.ConsumeIntervalBuffer = 5 * time.Minute
}
Comment on lines -77 to -80
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By removing this override, we can now try different buffer times when we have set NoPartiallyConsumedRegion to true


b := &BlockBuilder{
cfg: cfg,
Expand Down Expand Up @@ -489,7 +486,7 @@ func (b *BlockBuilder) consumePartitionSection(
return state, nil
}

var numBlocks int
var blockMetas []tsdb.BlockMeta
defer func(t time.Time, startState PartitionState) {
// No need to log or track time of the unfinished section. Just bail out.
if errors.Is(retErr, context.Canceled) {
Expand All @@ -507,7 +504,7 @@ func (b *BlockBuilder) consumePartitionSection(
level.Info(logger).Log("msg", "done consuming", "duration", dur,
"last_block_end", startState.LastBlockEnd, "curr_block_end", blockEnd,
"last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset,
"num_blocks", numBlocks)
"num_blocks", len(blockMetas))
}(time.Now(), state)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
Expand Down Expand Up @@ -610,11 +607,16 @@ consumerLoop:
}

var err error
numBlocks, err = builder.CompactAndUpload(ctx, b.uploadBlocks)
blockMetas, err = builder.CompactAndUpload(ctx, b.uploadBlocks)
if err != nil {
return state, err
}

prev, curr, next := getBlockCategoryCount(sectionEndTime, blockMetas)
b.blockBuilderMetrics.blockCounts.WithLabelValues("previous").Add(float64(prev))
b.blockBuilderMetrics.blockCounts.WithLabelValues("current").Add(float64(curr))
b.blockBuilderMetrics.blockCounts.WithLabelValues("next").Add(float64(next))
Comment on lines +615 to +618
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered putting these metrics inside TSDBBuilder.CompactAndUpload (ie. make them a part of tsdbMetrics collection). It feels this would require fewer changes in the overall code, and make "magic knowledge", like 2h block ranges, scoped closer together.


// We should take the max of last seen offsets. If the partition was lagging due to some record not being processed
// because of a future sample, we might be coming back to the same consume cycle again.
lastSeenOffset := max(lastRec.Offset, state.LastSeenOffset)
Expand Down Expand Up @@ -643,6 +645,22 @@ consumerLoop:
return newState, nil
}

func getBlockCategoryCount(sectionEndTime time.Time, blockMetas []tsdb.BlockMeta) (prev, curr, next int) {
// Doing -30m will take care
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) What does this comment say?

currHour := sectionEndTime.Add(-30 * time.Minute).Truncate(2 * time.Hour).Hour()
for _, m := range blockMetas {
hour := time.UnixMilli(m.MinTime/2 + m.MaxTime/2).Truncate(2 * time.Hour).Hour()
if hour < currHour {
prev++
} else if hour > currHour {
next++
} else {
curr++
}
}
return
}

type stateCommitter interface {
commitState(context.Context, *BlockBuilder, log.Logger, string, PartitionState) error
}
Expand Down Expand Up @@ -684,21 +702,18 @@ func (c *noOpCommitter) commitState(_ context.Context, _ *BlockBuilder, _ log.Lo

var _ stateCommitter = &noOpCommitter{}

func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, blockIDs []string) error {
func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error {
buc := bucket.NewUserBucketClient(tenantID, b.bucket, b.limits)
for _, bid := range blockIDs {
blockDir := path.Join(dbDir, bid)
meta, err := block.ReadMetaFromDir(blockDir)
if err != nil {
return fmt.Errorf("read block meta for block %s (tenant %s): %w", bid, tenantID, err)
}

if meta.Stats.NumSamples == 0 {
for _, m := range metas {
if m.Stats.NumSamples == 0 {
// No need to upload empty block.
level.Info(b.logger).Log("msg", "skip uploading empty block", "tenant", tenantID, "block", bid)
level.Info(b.logger).Log("msg", "skip uploading empty block", "tenant", tenantID, "block", m.ULID.String())
return nil
}

meta := &block.Meta{BlockMeta: m}
blockDir := path.Join(dbDir, meta.ULID.String())

meta.Thanos.Source = block.BlockBuilderSource
meta.Thanos.SegmentFiles = block.GetSegmentFiles(blockDir)

Expand All @@ -717,11 +732,11 @@ func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string,
if err == nil {
break
}
level.Warn(b.logger).Log("msg", "failed to upload block; will retry", "err", err, "block", bid, "tenant", tenantID)
level.Warn(b.logger).Log("msg", "failed to upload block; will retry", "err", err, "block", meta.ULID.String(), "tenant", tenantID)
boff.Wait()
}
if err := boff.ErrCause(); err != nil {
return fmt.Errorf("upload block %s (tenant %s): %w", bid, tenantID, err)
return fmt.Errorf("upload block %s (tenant %s): %w", meta.ULID.String(), tenantID, err)
}
}
return nil
Expand Down
48 changes: 45 additions & 3 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,7 @@ func TestNoPartiallyConsumedRegions(t *testing.T) {

cfg, overrides := blockBuilderConfig(t, kafkaAddr)
cfg.NoPartiallyConsumedRegion = true
cfg.ConsumeIntervalBuffer = 5 * time.Minute

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
kafkaCommits := atomic.NewInt32(0)
Expand All @@ -1155,9 +1156,6 @@ func TestNoPartiallyConsumedRegions(t *testing.T) {
bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides)
require.NoError(t, err)

// NoPartiallyConsumedRegion changes the buffer to 5 mins.
require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer)

require.NoError(t, bb.starting(ctx))
t.Cleanup(func() {
require.NoError(t, bb.stoppingStandaloneMode(nil))
Expand Down Expand Up @@ -1212,6 +1210,50 @@ func TestNoPartiallyConsumedRegions(t *testing.T) {
require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset))
}

// This is a temporary test for quick iteration on the new way of consuming partition.
// TODO: integrate it with other tests.
func TestGetBlockCategoryCount(t *testing.T) {
evenHrBounary := time.UnixMilli(10 * time.Hour.Milliseconds())
oddHrBounary := time.UnixMilli(9 * time.Hour.Milliseconds())
cases := []struct {
sectionEndTime time.Time
metas []tsdb.BlockMeta
prev, curr, next int
}{
{
sectionEndTime: evenHrBounary,
metas: []tsdb.BlockMeta{
{MinTime: evenHrBounary.Add(-4 * time.Hour).UnixMilli(), MaxTime: evenHrBounary.Add(-2 * time.Hour).UnixMilli()},
{MinTime: evenHrBounary.Add(-2 * time.Hour).UnixMilli(), MaxTime: evenHrBounary.UnixMilli()},
{MinTime: evenHrBounary.UnixMilli(), MaxTime: evenHrBounary.Add(5 * time.Minute).UnixMilli()},
},
prev: 1, curr: 1, next: 1,
},
{
sectionEndTime: oddHrBounary,
metas: []tsdb.BlockMeta{
{MinTime: oddHrBounary.Add(-3 * time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(-1 * time.Hour).UnixMilli()},
{MinTime: oddHrBounary.Add(-1 * time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(time.Hour).UnixMilli()},
// Although this is after the sectionEndTime, it is still the same 2h block.
{MinTime: oddHrBounary.UnixMilli(), MaxTime: oddHrBounary.Add(time.Hour).UnixMilli()},
{MinTime: oddHrBounary.Add(time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(2 * time.Hour).UnixMilli()},
},
prev: 1, curr: 2, next: 1,
},
}
for i, c := range cases {
// Buffer to add to sectionEndTime.
for _, buffer := range []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute} {
t.Run(fmt.Sprintf("%d,buffer=%s", i, buffer.String()), func(t *testing.T) {
prev, curr, next := getBlockCategoryCount(c.sectionEndTime.Add(buffer), c.metas)
require.Equal(t, c.prev, prev)
require.Equal(t, c.curr, curr)
require.Equal(t, c.next, next)
})
}
}
}

func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) {
cfg, overrides := blockBuilderConfig(t, addr)
cfg.SchedulerConfig = SchedulerConfig{
Expand Down
9 changes: 9 additions & 0 deletions pkg/blockbuilder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type blockBuilderMetrics struct {
processPartitionDuration *prometheus.HistogramVec
fetchErrors *prometheus.CounterVec
consumerLagRecords *prometheus.GaugeVec
blockCounts *prometheus.CounterVec
}

func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics {
Expand Down Expand Up @@ -40,6 +41,14 @@ func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics {
Help: "The per-topic-partition number of records, instance needs to work through each cycle.",
}, []string{"partition"})

// block_time can be "next", "current" or "previous".
// If the block belongs to the current 2h block range, it goes in "current".
// "next" or "previous" are used for the blocks that are not in the current 2h block range.
m.blockCounts = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_blockbuilder_blocks_produced_total",
Help: "Total number of blocks produced for specific block ranges (next, current, previous).",
}, []string{"block_time"})

return m
}

Expand Down
19 changes: 8 additions & 11 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -311,12 +310,12 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
}

// Function to upload the blocks.
type blockUploader func(_ context.Context, tenantID, dbDir string, blockIDs []string) error
type blockUploader func(_ context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error

// CompactAndUpload compacts the blocks of all the TSDBs and uploads them.
// uploadBlocks is a function that uploads the blocks to the required storage.
// All the DBs are closed and directories cleared irrespective of success or failure of this function.
func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error) {
func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (metas []tsdb.BlockMeta, err error) {
var (
closedDBsMu sync.Mutex
closedDBs = make(map[*userTSDB]bool)
Expand All @@ -343,11 +342,9 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
level.Info(b.logger).Log("msg", "compacting and uploading blocks", "num_tsdb", len(b.tsdbs))

if len(b.tsdbs) == 0 {
return 0, nil
return nil, nil
}

numBlocks := atomic.NewInt64(0)

eg, ctx := errgroup.WithContext(ctx)
if b.blocksStorageCfg.TSDB.ShipConcurrency > 0 {
eg.SetLimit(b.blocksStorageCfg.TSDB.ShipConcurrency)
Expand All @@ -371,12 +368,12 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
return err
}

var blockIDs []string
dbDir := db.Dir()
var localMetas []tsdb.BlockMeta
for _, b := range db.Blocks() {
blockIDs = append(blockIDs, b.Meta().ULID.String())
localMetas = append(localMetas, b.Meta())
}
numBlocks.Add(int64(len(blockIDs)))
metas = append(metas, localMetas...)

if err := db.Close(); err != nil {
return err
Expand All @@ -386,7 +383,7 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
closedDBs[db] = true
closedDBsMu.Unlock()

if err := uploadBlocks(ctx, tenant.tenantID, dbDir, blockIDs); err != nil {
if err := uploadBlocks(ctx, tenant.tenantID, dbDir, localMetas); err != nil {
return err
}

Expand All @@ -395,7 +392,7 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
})
}
err = eg.Wait()
return int(numBlocks.Load()), err
return metas, err
}

// Close closes all DBs and deletes their data directories.
Expand Down
8 changes: 4 additions & 4 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) {
require.NoError(t, err)

errUploadFailed := fmt.Errorf("upload failed")
_, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []string) error {
_, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []tsdb.BlockMeta) error {
return errUploadFailed
})
require.ErrorIs(t, err, errUploadFailed)
Expand Down Expand Up @@ -364,9 +364,9 @@ func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHis
}

func mockUploaderFunc(t *testing.T, destDir string) blockUploader {
return func(_ context.Context, _, dbDir string, blockIDs []string) error {
for _, bid := range blockIDs {
blockDir := path.Join(dbDir, bid)
return func(_ context.Context, _, dbDir string, metas []tsdb.BlockMeta) error {
for _, meta := range metas {
blockDir := path.Join(dbDir, meta.ULID.String())
err := os.Rename(blockDir, path.Join(destDir, path.Base(blockDir)))
require.NoError(t, err)
}
Expand Down
Loading