Skip to content

Commit

Permalink
feat(ingest-limits): Add GRPC service to read per tenant stream limit…
Browse files Browse the repository at this point in the history
…s from kafka (#15668)
  • Loading branch information
periklis authored and grobinson-grafana committed Jan 21, 2025
1 parent 814a8ac commit 1efc507
Show file tree
Hide file tree
Showing 13 changed files with 1,460 additions and 373 deletions.
9 changes: 9 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,15 @@ kafka_config:
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

ingest_limits:
# Enable the ingest limits.
# CLI flag: -kafka.ingest-limits.enabled
[enabled: <boolean> | default = false]

# The window size to use for the limiter.
# CLI flag: -kafka.ingest-limits.window-size
[window_size: <duration> | default = 1m]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
}

// Add metadata record
metadataRecord := kafka.EncodeStreamMetadata(partitionID, d.cfg.KafkaConfig.Topic, tenant, stream.Stream, startTime)
metadataRecord := kafka.EncodeStreamMetadata(partitionID, d.cfg.KafkaConfig.Topic, tenant, stream.Stream)
records = append(records, metadataRecord)

d.kafkaRecordsPerRequest.Observe(float64(len(records)))
Expand Down
15 changes: 15 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ var (
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
)

// IngestLimitsConfig holds configuration for the IngestLimits service.
type IngestLimitsConfig struct {
Enabled bool `yaml:"enabled"`
WindowSize time.Duration `yaml:"window_size"`
}

// RegisterFlags registers the configuration flags.
func (cfg *IngestLimitsConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".ingest-limits.enabled", false, "Enable the ingest limits.")
fs.DurationVar(&cfg.WindowSize, prefix+".ingest-limits.window-size", 1*time.Minute, "The window size to use for the limiter.")
}

// Config holds the generic config for the Kafka backend.
type Config struct {
Address string `yaml:"address"`
Expand All @@ -56,10 +68,13 @@ type Config struct {
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`

MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`

IngestLimits IngestLimitsConfig `yaml:"ingest_limits,omitempty"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("kafka", f)
cfg.IngestLimits.RegisterFlagsWithPrefix("kafka", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
math_bits "math/bits"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo"

Expand Down Expand Up @@ -198,7 +197,7 @@ func sovPush(x uint64) (n int) {

// EncodeStreamMetadata encodes the stream metadata into a Kafka record
// using the tenantID as the key and partition as the target partition
func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream logproto.Stream, lastSeenAt time.Time) *kgo.Record {
func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream logproto.Stream) *kgo.Record {
// Validate stream
if stream.Labels == "" || stream.Hash == 0 {
return nil
Expand All @@ -210,7 +209,6 @@ func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream

// Transform stream into metadata
metadata.StreamHash = stream.Hash
metadata.LastSeenAt = lastSeenAt.UnixNano()

// Encode the metadata into a byte slice
value, err := metadata.Marshal()
Expand All @@ -224,7 +222,7 @@ func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream
Key: []byte(tenantID),
Value: value,
Partition: partition,
Topic: topic + metadataTopicSuffix,
Topic: MetadataTopicFor(topic),
}
}

Expand All @@ -247,3 +245,8 @@ func DecodeStreamMetadata(record *kgo.Record) (*logproto.StreamMetadata, error)

return metadata, nil
}

// MetadataTopicFor returns the metadata topic name for the given topic.
func MetadataTopicFor(topic string) string {
return topic + metadataTopicSuffix
}
52 changes: 23 additions & 29 deletions pkg/kafka/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,68 +153,63 @@ func generateRandomString(length int) string {

func TestEncodeDecodeStreamMetadata(t *testing.T) {
tests := []struct {
name string
stream logproto.Stream
partition int32
topic string
tenantID string
lastSeenAt time.Time
expectErr bool
name string
stream logproto.Stream
partition int32
topic string
tenantID string
expectErr bool
}{
{
name: "Valid metadata",
stream: logproto.Stream{
Labels: `{app="test"}`,
Hash: 12345,
},
partition: 1,
topic: "logs",
tenantID: "tenant-1",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: false,
partition: 1,
topic: "logs",
tenantID: "tenant-1",
expectErr: false,
},
{
name: "Empty labels - should error",
stream: logproto.Stream{
Labels: "",
Hash: 67890,
},
partition: 2,
topic: "metrics",
tenantID: "tenant-2",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
partition: 2,
topic: "metrics",
tenantID: "tenant-2",
expectErr: true,
},
{
name: "Zero hash - should error",
stream: logproto.Stream{
Labels: `{app="test"}`,
Hash: 0,
},
partition: 3,
topic: "traces",
tenantID: "tenant-3",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
partition: 3,
topic: "traces",
tenantID: "tenant-3",
expectErr: true,
},
{
name: "Empty labels and zero hash - should error",
stream: logproto.Stream{
Labels: "",
Hash: 0,
},
partition: 4,
topic: "traces",
tenantID: "tenant-4",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
partition: 4,
topic: "traces",
tenantID: "tenant-4",
expectErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Encode metadata
record := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.stream, tt.lastSeenAt)
record := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.stream)
if tt.expectErr {
require.Nil(t, record)
return
Expand All @@ -233,7 +228,6 @@ func TestEncodeDecodeStreamMetadata(t *testing.T) {

// Verify decoded values
require.Equal(t, tt.stream.Hash, metadata.StreamHash)
require.Equal(t, tt.lastSeenAt.UnixNano(), metadata.LastSeenAt)

// Return metadata to pool
metadataPool.Put(metadata)
Expand Down
Loading

0 comments on commit 1efc507

Please sign in to comment.