Skip to content

Commit

Permalink
chore(block-builder): update block builder to use kafka clients direc…
Browse files Browse the repository at this point in the history
…tly (grafana#15433)

Co-authored-by: Christian Haudum <[email protected]>
Co-authored-by: Owen Diehl <[email protected]>
  • Loading branch information
3 people authored and mveitas committed Jan 6, 2025
1 parent e39260b commit 0541f73
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 61 deletions.
120 changes: 70 additions & 50 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand Down Expand Up @@ -112,13 +114,14 @@ type BlockBuilder struct {

id string
cfg Config
kafkaCfg kafka.Config
periodicConfigs []config.PeriodConfig
metrics *builderMetrics
logger log.Logger

decoder *kafka.Decoder
readerFactory func(partition int32) (partition.Reader, error)
metrics *builderMetrics
logger log.Logger
registerer prometheus.Registerer

decoder *kafka.Decoder
store stores.ChunkWriter
objStore *MultiStore

Expand All @@ -129,34 +132,36 @@ type BlockBuilder struct {
func NewBlockBuilder(
id string,
cfg Config,
kafkaCfg kafka.Config,
periodicConfigs []config.PeriodConfig,
readerFactory func(partition int32) (partition.Reader, error),
store stores.ChunkWriter,
objStore *MultiStore,
logger log.Logger,
reg prometheus.Registerer,
registerer prometheus.Registerer,
) (*BlockBuilder,
error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}

t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, reg)
t, err := types.NewGRPCTransportFromAddress(cfg.SchedulerAddress, cfg.SchedulerGRPCClientConfig, registerer)
if err != nil {
return nil, fmt.Errorf("create grpc transport: %w", err)
}

i := &BlockBuilder{
id: id,
cfg: cfg,
kafkaCfg: kafkaCfg,
periodicConfigs: periodicConfigs,
metrics: newBuilderMetrics(reg),
metrics: newBuilderMetrics(registerer),
logger: logger,
registerer: registerer,
decoder: decoder,
readerFactory: readerFactory,
store: store,
objStore: objStore,
inflightJobs: make(map[string]*types.Job),
BuilderTransport: t,
}

Expand All @@ -165,20 +170,26 @@ func NewBlockBuilder(
}

func (i *BlockBuilder) running(ctx context.Context) error {
wg := sync.WaitGroup{}

errgrp, ctx := errgroup.WithContext(ctx)
for j := 0; j < i.cfg.WorkerParallelism; j++ {
wg.Add(1)
go func(id string) {
defer wg.Done()
workerID := fmt.Sprintf("block-builder-worker-%d", j)
errgrp.Go(func() error {
c, err := client.NewReaderClient(
i.kafkaCfg,
client.NewReaderClientMetrics(workerID, i.registerer),
log.With(i.logger, "component", workerID),
)
if err != nil {
return err
}

var waitFor time.Duration
for {
select {
case <-ctx.Done():
return
return nil
case <-time.After(waitFor):
gotJob, err := i.runOne(ctx, id)
gotJob, err := i.runOne(ctx, c, workerID)
if err != nil {
level.Error(i.logger).Log("msg", "block builder run failed", "err", err)
}
Expand All @@ -191,30 +202,27 @@ func (i *BlockBuilder) running(ctx context.Context) error {
}
}
}
}(fmt.Sprintf("worker-%d", j))
}

wg.Add(1)
go func() {
defer wg.Done()
})
}

errgrp.Go(func() error {
ticker := time.NewTicker(i.cfg.SyncInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
if err := i.syncJobs(ctx); err != nil {
level.Error(i.logger).Log("msg", "failed to sync jobs", "err", err)
}
}
}
}()
})

wg.Wait()
return nil
return errgrp.Wait()
}

func (i *BlockBuilder) syncJobs(ctx context.Context) error {
Expand All @@ -233,7 +241,7 @@ func (i *BlockBuilder) syncJobs(ctx context.Context) error {
return nil
}

func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error) {
func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID string) (bool, error) {
// assuming GetJob blocks/polls until a job is available
resp, err := i.SendGetJobRequest(ctx, &types.GetJobRequest{
BuilderID: workerID,
Expand Down Expand Up @@ -266,7 +274,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
Job: job,
Success: true,
}
if _, err = i.processJob(ctx, job, logger); err != nil {
if _, err = i.processJob(ctx, c, job, logger); err != nil {
level.Error(i.logger).Log("msg", "failed to process job", "err", err)
completion.Success = false
}
Expand All @@ -292,7 +300,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
return true, err
}

func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
level.Debug(logger).Log("msg", "beginning job")

indexer := newTsdbCreator()
Expand All @@ -316,7 +324,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh)
lastOffset, err = i.loadRecords(ctx, c, job.Partition(), job.Offsets(), inputCh)
return err
},
func(ctx context.Context) error {
Expand Down Expand Up @@ -502,47 +510,59 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo
return lastOffset, nil
}

func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
f, err := i.readerFactory(partitionID)
if err != nil {
return 0, err
}

f.SetOffsetForConsumption(offsets.Min)
func (i *BlockBuilder) loadRecords(ctx context.Context, c *kgo.Client, partitionID int32, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) {
// Use NoResetOffset to avoid resetting the offset to the beginning of the partition when the requested offset is out of range.
// This could happen if the requested records are already outside of retention period. We should fail the job is such cases leaving the scheduler to make a decision.
c.AddConsumePartitions(map[string]map[int32]kgo.Offset{
i.kafkaCfg.Topic: {partitionID: kgo.NoResetOffset().At(offsets.Min)},
})
defer c.RemoveConsumePartitions(map[string][]int32{
i.kafkaCfg.Topic: {partitionID},
})

var (
lastOffset = offsets.Min - 1
boff = backoff.New(ctx, i.cfg.Backoff)
lastConsumedOffset = offsets.Min - 1
lastSeenOffset = offsets.Min - 1
boff = backoff.New(ctx, i.cfg.Backoff)
)

for lastOffset < offsets.Max && boff.Ongoing() {
var records []partition.Record
records, err = f.Poll(ctx, int(offsets.Max-lastOffset))
if err != nil {
for lastSeenOffset < offsets.Max && boff.Ongoing() {
if err := context.Cause(ctx); err != nil {
return 0, err
}

fs := c.PollRecords(ctx, int(offsets.Max-lastConsumedOffset))
// TODO: better error handling for non-retrybale errors
// we don't have to iterate over all errors since we only fetch a single partition
if err := fs.Err(); err != nil {
level.Error(i.logger).Log("msg", "failed to poll records", "err", err)
boff.Wait()
continue
}

if len(records) == 0 {
if fs.Empty() {
// No more records available
break
}

// Reset backoff on successful poll
boff.Reset()

converted := make([]AppendInput, 0, len(records))
for _, record := range records {
converted := make([]AppendInput, 0, fs.NumRecords())
for iter := fs.RecordIter(); !iter.Done(); {
record := iter.Next()
lastSeenOffset = record.Offset
if record.Offset >= offsets.Max {
level.Debug(i.logger).Log("msg", "record offset exceeds job max offset. stop processing", "record offset", record.Offset, "max offset", offsets.Max)
break
}
lastOffset = record.Offset

stream, labels, err := i.decoder.Decode(record.Content)
stream, labels, err := i.decoder.Decode(record.Value)
if err != nil {
return 0, fmt.Errorf("failed to decode record: %w", err)
}

lastConsumedOffset = record.Offset
if len(stream.Entries) == 0 {
continue
}
Expand All @@ -552,7 +572,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
copy(entries, stream.Entries)

converted = append(converted, AppendInput{
tenant: record.TenantID,
tenant: string(record.Key),
labels: labels,
labelsStr: stream.Labels,
entries: entries,
Expand All @@ -568,7 +588,7 @@ func (i *BlockBuilder) loadRecords(ctx context.Context, partitionID int32, offse
}
}

return lastOffset, boff.Err()
return lastConsumedOffset, boff.Err()
}

func withBackoff[T any](
Expand Down
12 changes: 1 addition & 11 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1836,21 +1836,11 @@ func (t *Loki) initBlockBuilder() (services.Service, error) {
return nil, err
}

readerMetrics := partition.NewReaderMetrics(prometheus.DefaultRegisterer)
readerFactory := func(partitionID int32) (partition.Reader, error) {
return partition.NewKafkaReader(
t.Cfg.KafkaConfig,
partitionID,
logger,
readerMetrics,
)
}

bb, err := blockbuilder.NewBlockBuilder(
id,
t.Cfg.BlockBuilder,
t.Cfg.KafkaConfig,
t.Cfg.SchemaConfig.Configs,
readerFactory,
t.Store,
objectStore,
logger,
Expand Down

0 comments on commit 0541f73

Please sign in to comment.