Skip to content

Commit

Permalink
Wire up ingest pipeline into distributor push handler
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum committed Jan 13, 2025
1 parent 068f81d commit 4cfe178
Showing 21 changed files with 1,311 additions and 93 deletions.
131 changes: 81 additions & 50 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/compactor/retention"
"github.com/grafana/loki/v3/pkg/distributor/clientpool"
"github.com/grafana/loki/v3/pkg/distributor/model"
"github.com/grafana/loki/v3/pkg/distributor/shardstreams"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester"
@@ -160,7 +161,7 @@ type Distributor struct {
subservicesWatcher *services.FailureWatcher
// Per-user rate limiter.
ingestionRateLimiter *limiter.RateLimiter
labelCache *lru.Cache[string, labelData]
labelCache *lru.Cache[string, labelsWithHash]

// Push failures rate limiter.
writeFailuresManager *writefailures.Manager
@@ -230,7 +231,7 @@ func New(
var servs []services.Service

rateLimitStrat := validation.LocalIngestionRateStrategy
labelCache, err := lru.New[string, labelData](maxLabelCacheSize)
labelCache, err := lru.New[string, labelsWithHash](maxLabelCacheSize)
if err != nil {
return nil, err
}
@@ -400,14 +401,9 @@ func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

type KeyedStream struct {
HashKey uint32
Stream logproto.Stream
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
type streamTracker struct {
KeyedStream
stream model.KeyedStream
minSuccess int
maxFailures int
succeeded atomic.Int32
@@ -450,28 +446,31 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, httpgrpc.Errorf(http.StatusUnprocessableEntity, validation.MissingStreamsErrorMsg)
}

output := model.NewStreamsBuilder(len(req.Streams))

// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]KeyedStream, 0, len(req.Streams))
validatedLineSize := 0
validatedLineCount := 0

var validationErrors util.GroupedErrors

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)
fieldDetector := newFieldDetector(validationContext)
vContext := d.validator.getValidationContextForTime(now, tenantID)
fieldDetector := newFieldDetector(vContext)
shouldDiscoverLevels := fieldDetector.shouldDiscoverLogLevels()
shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields()

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
if shardStreamsCfg.Enabled {
streams = append(streams, d.shardStream(stream, pushSize, tenantID)...)
for _, s := range d.shardStream(stream, pushSize, tenantID) {
output.AddStream(s)
}
return
}
streams = append(streams, KeyedStream{
output.AddStream(model.KeyedStream{
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
Stream: stream,
})
@@ -495,12 +494,12 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

func() {
pprof.Do(ctx, pprof.Labels("event", "validate_request"), func(ctx context.Context) {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogKV("event", "start to validate request")
sp.LogKV("event", "start validating request")
defer func() {
sp.LogKV("event", "finished to validate request")
sp.LogKV("event", "finished validating request")
}()
}

@@ -511,10 +510,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)
d.truncateLines(vContext, &stream)

var lbs labels.Labels
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(validationContext, stream.Labels, stream)
lbs, stream.Labels, stream.Hash, err = d.parseStreamLabels(vContext, stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
@@ -525,11 +524,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

n := 0
pushSize := 0
prevTs := stream.Entries[0].Timestamp

for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil {
if err := d.validator.ValidateEntry(ctx, vContext, lbs, entry); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
@@ -572,7 +570,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// since Loki will accept out of order writes it doesn't account for separate
// pushes with overlapping time ranges having entries with duplicate timestamps

if validationContext.incrementDuplicateTimestamps && n != 0 {
if vContext.incrementDuplicateTimestamps && n != 0 {
// Traditional logic for Loki is that 2 lines with the same timestamp and
// exact same content will be de-duplicated, (i.e. only one will be stored, others dropped)
// To maintain this behavior, only increment the timestamp if the log content is different
@@ -586,30 +584,63 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
n++
validatedLineSize += util.EntryTotalSize(&entry)
validatedLineCount++
pushSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
if len(stream.Entries) == 0 {
// Empty stream after validating all the entries

output.AddStream(model.KeyedStream{
Stream: stream,
ParsedLabels: logproto.FromLabelsToLabelAdapters(lbs),
HashKey: lokiring.TokenFor(tenantID, stream.Labels),
})
}
})

// TODO(chaudum) Integrate pipeline processing
// pprof.Do(ctx, pprof.Labels("event", "preprocess_streams"), func(ctx context.Context) {
// sp := opentracing.SpanFromContext(ctx)
// if sp != nil {
// sp.LogKV("event", "start preprocessing streams")
// defer func() {
// sp.LogKV("event", "finished preprocessing streams")
// }()
// }
// })

pprof.Do(ctx, pprof.Labels("event", "shard_streams"), func(ctx context.Context) {
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogKV("event", "start sharding streams")
defer func() {
sp.LogKV("event", "finished sharding streams")
}()
}

for _, stream := range output.Build() {
lbs, _, _, err := d.parseStreamLabels(vContext, stream.Stream)
if err != nil {
continue
}

maybeShardStreams(stream, lbs, pushSize)
var pushSize int
for _, entry := range stream.Entries {
pushSize += len(entry.Line)
}
maybeShardStreams(stream.Stream, lbs, pushSize)
}
}()
})

var validationErr error
if validationErrors.Err() != nil {
validationErr = httpgrpc.Errorf(http.StatusBadRequest, "%s", validationErrors.Error())
}

// Return early if none of the streams contained entries
streams := output.Build()
if len(streams) == 0 {
return &logproto.PushResponse{}, validationErr
}

if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)
if block, until, retStatusCode := d.validator.ShouldBlockIngestion(vContext, now); block {
d.trackDiscardedData(ctx, req, vContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)

err = fmt.Errorf(validation.BlockedIngestionErrorMsg, tenantID, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
@@ -624,7 +655,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited)
d.trackDiscardedData(ctx, req, vContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validatedLineCount, validatedLineSize)
d.writeFailuresManager.Log(tenantID, err)
@@ -685,7 +716,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

streamTrackers[i] = streamTracker{
KeyedStream: stream,
stream: stream,
minSuccess: len(replicationSet.Instances) - replicationSet.MaxErrors,
maxFailures: replicationSet.MaxErrors,
}
@@ -748,7 +779,7 @@ func (d *Distributor) trackDiscardedData(

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
lbs, _, _, err := d.parseStreamLabels(validationContext, stream)
if err != nil {
continue
}
@@ -851,13 +882,13 @@ func shardStreamByTime(stream logproto.Stream, lbls labels.Labels, timeShardLen
// streams and their associated keys for hashing to ingesters.
//
// The number of shards is limited by the number of entries.
func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) []KeyedStream {
func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID string) []model.KeyedStream {
shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
logger := log.With(util_log.WithUserID(tenantID, d.logger), "stream", stream.Labels)
shardCount := d.shardCountFor(logger, &stream, pushSize, tenantID, shardStreamsCfg)

if shardCount <= 1 {
return []KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), Stream: stream}}
return []model.KeyedStream{{HashKey: lokiring.TokenFor(tenantID, stream.Labels), Stream: stream}}
}

d.streamShardCount.Inc()
@@ -868,7 +899,7 @@ func (d *Distributor) shardStream(stream logproto.Stream, pushSize int, tenantID
return d.divideEntriesBetweenShards(tenantID, shardCount, shardStreamsCfg, stream)
}

func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []KeyedStream {
func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards int, shardStreamsCfg shardstreams.Config, stream logproto.Stream) []model.KeyedStream {
derivedStreams := d.createShards(stream, totalShards, tenantID, shardStreamsCfg)

for i := 0; i < len(stream.Entries); i++ {
@@ -880,11 +911,11 @@ func (d *Distributor) divideEntriesBetweenShards(tenantID string, totalShards in
return derivedStreams
}

func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []KeyedStream {
func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tenantID string, shardStreamsCfg shardstreams.Config) []model.KeyedStream {
var (
streamLabels = labelTemplate(stream.Labels, d.logger)
streamPattern = streamLabels.String()
derivedStreams = make([]KeyedStream, 0, totalShards)
derivedStreams = make([]model.KeyedStream, 0, totalShards)

streamCount = streamCount(totalShards, stream)
)
@@ -900,7 +931,7 @@ func (d *Distributor) createShards(stream logproto.Stream, totalShards int, tena
shardNum := (startShard + i) % totalShards
shard := d.createShard(streamLabels, streamPattern, shardNum, entriesPerShard)

derivedStreams = append(derivedStreams, KeyedStream{
derivedStreams = append(derivedStreams, model.KeyedStream{
HashKey: lokiring.TokenFor(tenantID, shard.Labels),
Stream: shard,
})
@@ -1043,7 +1074,7 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
Streams: make([]logproto.Stream, len(streams)),
}
for i, s := range streams {
req.Streams[i] = s.Stream
req.Streams[i] = s.stream.Stream
}

_, err = c.(logproto.PusherClient).Push(ctx, req)
@@ -1059,9 +1090,9 @@ func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.Instance
return err
}

func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStream, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []model.KeyedStream, tenant string, tracker *pushTracker, subring *ring.PartitionRing) {
for _, s := range streams {
go func(s KeyedStream) {
go func(s model.KeyedStream) {
err := d.sendStreamToKafka(ctx, s, tenant, subring)
if err != nil {
err = fmt.Errorf("failed to write stream to kafka: %w", err)
@@ -1071,7 +1102,7 @@ func (d *Distributor) sendStreamsToKafka(ctx context.Context, streams []KeyedStr
}
}

func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream, tenant string, subring *ring.PartitionRing) error {
func (d *Distributor) sendStreamToKafka(ctx context.Context, stream model.KeyedStream, tenant string, subring *ring.PartitionRing) error {
if len(stream.Stream.Entries) == 0 {
return nil
}
@@ -1122,19 +1153,19 @@ func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes
return
}

type labelData struct {
ls labels.Labels
type labelsWithHash struct {
lbs labels.Labels
hash uint64
}

func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(key); ok {
return val.ls, val.ls.String(), val.hash, nil
func (d *Distributor) parseStreamLabels(vContext validationContext, stream logproto.Stream) (labels.Labels, string, uint64, error) {
if val, ok := d.labelCache.Get(stream.Labels); ok {
return val.lbs, val.lbs.String(), val.hash, nil
}

ls, err := syntax.ParseLabels(key)
ls, err := syntax.ParseLabels(stream.Labels)
if err != nil {
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
return nil, "", 0, fmt.Errorf(validation.InvalidLabelsErrorMsg, stream.Labels, err)
}

if err := d.validator.ValidateLabels(vContext, ls, stream); err != nil {
@@ -1143,7 +1174,7 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string,

lsHash := ls.Hash()

d.labelCache.Add(key, labelData{ls, lsHash})
d.labelCache.Add(stream.Labels, labelsWithHash{ls, lsHash})
return ls, ls.String(), lsHash, nil
}

Loading

0 comments on commit 4cfe178

Please sign in to comment.