Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest-limits): Add GRPC service to read per tenant stream limits from kafka #15668

Merged
merged 12 commits into from
Jan 10, 2025
9 changes: 9 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,15 @@ kafka_config:
# CLI flag: -kafka.max-consumer-lag-at-startup
[max_consumer_lag_at_startup: <duration> | default = 15s]

ingest_limits:
# Enable the ingest limits.
# CLI flag: -kafka.ingest-limits.enabled
[enabled: <boolean> | default = false]

# The window size to use for the limiter.
# CLI flag: -kafka.ingest-limits.window-size
[window_size: <duration> | default = 1m]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ func (d *Distributor) sendStreamToKafka(ctx context.Context, stream KeyedStream,
}

// Add metadata record
metadataRecord := kafka.EncodeStreamMetadata(partitionID, d.cfg.KafkaConfig.Topic, tenant, stream.Stream, startTime)
metadataRecord := kafka.EncodeStreamMetadata(partitionID, d.cfg.KafkaConfig.Topic, tenant, stream.Stream)
records = append(records, metadataRecord)

d.kafkaRecordsPerRequest.Observe(float64(len(records)))
Expand Down
15 changes: 15 additions & 0 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ var (
ErrInvalidProducerMaxRecordSizeBytes = fmt.Errorf("the configured producer max record size bytes must be a value between %d and %d", minProducerRecordDataBytesLimit, maxProducerRecordDataBytesLimit)
)

// IngestLimitsConfig holds configuration for the IngestLimits service.
type IngestLimitsConfig struct {
Enabled bool `yaml:"enabled"`
WindowSize time.Duration `yaml:"window_size"`
}

// RegisterFlags registers the configuration flags.
func (cfg *IngestLimitsConfig) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) {
fs.BoolVar(&cfg.Enabled, prefix+".ingest-limits.enabled", false, "Enable the ingest limits.")
fs.DurationVar(&cfg.WindowSize, prefix+".ingest-limits.window-size", 1*time.Minute, "The window size to use for the limiter.")
}

// Config holds the generic config for the Kafka backend.
type Config struct {
Address string `yaml:"address"`
Expand All @@ -56,10 +68,13 @@ type Config struct {
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`

MaxConsumerLagAtStartup time.Duration `yaml:"max_consumer_lag_at_startup"`

IngestLimits IngestLimitsConfig `yaml:"ingest_limits,omitempty"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("kafka", f)
cfg.IngestLimits.RegisterFlagsWithPrefix("kafka", f)
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand Down
11 changes: 7 additions & 4 deletions pkg/kafka/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
math_bits "math/bits"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kgo"

Expand Down Expand Up @@ -198,7 +197,7 @@ func sovPush(x uint64) (n int) {

// EncodeStreamMetadata encodes the stream metadata into a Kafka record
// using the tenantID as the key and partition as the target partition
func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream logproto.Stream, lastSeenAt time.Time) *kgo.Record {
func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream logproto.Stream) *kgo.Record {
// Validate stream
if stream.Labels == "" || stream.Hash == 0 {
return nil
Expand All @@ -210,7 +209,6 @@ func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream

// Transform stream into metadata
metadata.StreamHash = stream.Hash
metadata.LastSeenAt = lastSeenAt.UnixNano()

// Encode the metadata into a byte slice
value, err := metadata.Marshal()
Expand All @@ -224,7 +222,7 @@ func EncodeStreamMetadata(partition int32, topic string, tenantID string, stream
Key: []byte(tenantID),
Value: value,
Partition: partition,
Topic: topic + metadataTopicSuffix,
Topic: MetadataTopicFor(topic),
}
}

Expand All @@ -247,3 +245,8 @@ func DecodeStreamMetadata(record *kgo.Record) (*logproto.StreamMetadata, error)

return metadata, nil
}

// MetadataTopicFor returns the metadata topic name for the given topic.
func MetadataTopicFor(topic string) string {
return topic + metadataTopicSuffix
}
52 changes: 23 additions & 29 deletions pkg/kafka/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,68 +153,63 @@ func generateRandomString(length int) string {

func TestEncodeDecodeStreamMetadata(t *testing.T) {
tests := []struct {
name string
stream logproto.Stream
partition int32
topic string
tenantID string
lastSeenAt time.Time
expectErr bool
name string
stream logproto.Stream
partition int32
topic string
tenantID string
expectErr bool
}{
{
name: "Valid metadata",
stream: logproto.Stream{
Labels: `{app="test"}`,
Hash: 12345,
},
partition: 1,
topic: "logs",
tenantID: "tenant-1",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: false,
partition: 1,
topic: "logs",
tenantID: "tenant-1",
expectErr: false,
},
{
name: "Empty labels - should error",
stream: logproto.Stream{
Labels: "",
Hash: 67890,
},
partition: 2,
topic: "metrics",
tenantID: "tenant-2",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
partition: 2,
topic: "metrics",
tenantID: "tenant-2",
expectErr: true,
},
{
name: "Zero hash - should error",
stream: logproto.Stream{
Labels: `{app="test"}`,
Hash: 0,
},
partition: 3,
topic: "traces",
tenantID: "tenant-3",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
partition: 3,
topic: "traces",
tenantID: "tenant-3",
expectErr: true,
},
{
name: "Empty labels and zero hash - should error",
stream: logproto.Stream{
Labels: "",
Hash: 0,
},
partition: 4,
topic: "traces",
tenantID: "tenant-4",
lastSeenAt: time.Now().Truncate(time.Millisecond),
expectErr: true,
partition: 4,
topic: "traces",
tenantID: "tenant-4",
expectErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Encode metadata
record := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.stream, tt.lastSeenAt)
record := EncodeStreamMetadata(tt.partition, tt.topic, tt.tenantID, tt.stream)
if tt.expectErr {
require.Nil(t, record)
return
Expand All @@ -233,7 +228,6 @@ func TestEncodeDecodeStreamMetadata(t *testing.T) {

// Verify decoded values
require.Equal(t, tt.stream.Hash, metadata.StreamHash)
require.Equal(t, tt.lastSeenAt.UnixNano(), metadata.LastSeenAt)

// Return metadata to pool
metadataPool.Put(metadata)
Expand Down
Loading
Loading