Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Detect fields based on per-tenant configuration and put them into structured metadata at ingest time #15188

Merged
merged 11 commits into from
Jan 7, 2025
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3400,6 +3400,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v
# CLI flag: -validation.increment-duplicate-timestamps
[increment_duplicate_timestamp: <boolean> | default = false]

# Experimental: Detect fields from stream labels, structured metadata, or
# json/logfmt formatted log line and put them into structured metadata of the
# log entry.
discover_generic_fields:
[fields: <map of string to list of strings>]

# If no service_name label exists, Loki maps a single label from the configured
# list to service_name. If none of the configured labels exist in the stream,
# label is set to unknown_service. Empty list disables setting the label.
Expand Down
26 changes: 20 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net/http"
"runtime/pprof"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -460,8 +461,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log

now := time.Now()
validationContext := d.validator.getValidationContextForTime(now, tenantID)
levelDetector := newLevelDetector(validationContext)
shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels()
fieldDetector := newFieldDetector(validationContext)
shouldDiscoverLevels := fieldDetector.shouldDiscoverLogLevels()
shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields()

shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID)
maybeShardByRate := func(stream logproto.Stream, pushSize int) {
Expand Down Expand Up @@ -547,10 +549,22 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}
if shouldDiscoverLevels {
logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
}
pprof.Do(ctx, pprof.Labels("action", "discover_log_level"), func(_ context.Context) {
logLevel, ok := fieldDetector.extractLogLevel(lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel)
}
})
}
if shouldDiscoverGenericFields {
pprof.Do(ctx, pprof.Labels("action", "discover_generic_fields"), func(_ context.Context) {
for field, hints := range fieldDetector.validationContext.discoverGenericFields {
extracted, ok := fieldDetector.extractGenericField(field, hints, lbs, structuredMetadata, entry)
if ok {
entry.StructuredMetadata = append(entry.StructuredMetadata, extracted)
}
}
})
}
stream.Entries[n] = entry

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/log/jsonexpr"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
"github.com/grafana/loki/v3/pkg/util/constants"
)
Expand All @@ -31,46 +33,43 @@ var (
errorAbbrv = []byte("err")
critical = []byte("critical")
fatal = []byte("fatal")

defaultAllowedLevelFields = []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}
)

func allowedLabelsForLevel(allowedFields []string) map[string]struct{} {
func allowedLabelsForLevel(allowedFields []string) []string {
if len(allowedFields) == 0 {
return map[string]struct{}{
"level": {}, "LEVEL": {}, "Level": {},
"severity": {}, "SEVERITY": {}, "Severity": {},
"lvl": {}, "LVL": {}, "Lvl": {},
}
}
allowedFieldsMap := make(map[string]struct{}, len(allowedFields))
for _, field := range allowedFields {
allowedFieldsMap[field] = struct{}{}
return defaultAllowedLevelFields
}
return allowedFieldsMap
return allowedFields
}

type LevelDetector struct {
validationContext validationContext
allowedLabels map[string]struct{}
type FieldDetector struct {
validationContext validationContext
allowedLevelLabels []string
}

func newLevelDetector(validationContext validationContext) *LevelDetector {
logLevelFields := validationContext.logLevelFields
return &LevelDetector{
validationContext: validationContext,
allowedLabels: allowedLabelsForLevel(logLevelFields),
func newFieldDetector(validationContext validationContext) *FieldDetector {
return &FieldDetector{
validationContext: validationContext,
allowedLevelLabels: allowedLabelsForLevel(validationContext.logLevelFields),
}
}

func (l *LevelDetector) shouldDiscoverLogLevels() bool {
func (l *FieldDetector) shouldDiscoverLogLevels() bool {
return l.validationContext.allowStructuredMetadata && l.validationContext.discoverLogLevels
}

func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {
levelFromLabel, hasLevelLabel := l.hasAnyLevelLabels(labels)
func (l *FieldDetector) shouldDiscoverGenericFields() bool {
return l.validationContext.allowStructuredMetadata && len(l.validationContext.discoverGenericFields) > 0
}

func (l *FieldDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {
levelFromLabel, hasLevelLabel := labelsContainAny(labels, l.allowedLevelLabels)
var logLevel string
if hasLevelLabel {
logLevel = levelFromLabel
} else if levelFromMetadata, ok := l.hasAnyLevelLabels(structuredMetadata); ok {
} else if levelFromMetadata, ok := labelsContainAny(structuredMetadata, l.allowedLevelLabels); ok {
logLevel = levelFromMetadata
} else {
logLevel = l.detectLogLevelFromLogEntry(entry, structuredMetadata)
Expand All @@ -85,16 +84,33 @@ func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata
}, true
}

func (l *LevelDetector) hasAnyLevelLabels(labels labels.Labels) (string, bool) {
for lbl := range l.allowedLabels {
if labels.Has(lbl) {
return labels.Get(lbl), true
func (l *FieldDetector) extractGenericField(name string, hints []string, labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) {

var value string
if v, ok := labelsContainAny(labels, hints); ok {
value = v
} else if v, ok := labelsContainAny(structuredMetadata, hints); ok {
value = v
} else {
value = l.detectGenericFieldFromLogEntry(entry, hints)
}

if value == "" {
return logproto.LabelAdapter{}, false
}
return logproto.LabelAdapter{Name: name, Value: value}, true
}

func labelsContainAny(labels labels.Labels, names []string) (string, bool) {
for _, name := range names {
if labels.Has(name) {
return labels.Get(name), true
}
}
return "", false
}

func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string {
func (l *FieldDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string {
// otlp logs have a severity number, using which we are defining the log levels.
// Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber
if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" {
Expand Down Expand Up @@ -123,13 +139,24 @@ func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structu
return l.extractLogLevelFromLogLine(entry.Line)
}

func (l *LevelDetector) extractLogLevelFromLogLine(log string) string {
logSlice := unsafe.Slice(unsafe.StringData(log), len(log))
func (l *FieldDetector) detectGenericFieldFromLogEntry(entry logproto.Entry, hints []string) string {
lineBytes := unsafe.Slice(unsafe.StringData(entry.Line), len(entry.Line))
var v []byte
if isJSON(entry.Line) {
v = getValueUsingJSONParser(lineBytes, hints)
} else if isLogFmt(lineBytes) {
v = getValueUsingLogfmtParser(lineBytes, hints)
}
return string(v)
}

func (l *FieldDetector) extractLogLevelFromLogLine(log string) string {
JoaoBraveCoding marked this conversation as resolved.
Show resolved Hide resolved
lineBytes := unsafe.Slice(unsafe.StringData(log), len(log))
var v []byte
if isJSON(log) {
v = l.getValueUsingJSONParser(logSlice)
} else if isLogFmt(logSlice) {
v = l.getValueUsingLogfmtParser(logSlice)
v = getValueUsingJSONParser(lineBytes, l.allowedLevelLabels)
} else if isLogFmt(lineBytes) {
v = getValueUsingLogfmtParser(lineBytes, l.allowedLevelLabels)
} else {
return detectLevelFromLogLine(log)
}
Expand All @@ -154,24 +181,42 @@ func (l *LevelDetector) extractLogLevelFromLogLine(log string) string {
}
}

func (l *LevelDetector) getValueUsingLogfmtParser(line []byte) []byte {
func getValueUsingLogfmtParser(line []byte, hints []string) []byte {
d := logfmt.NewDecoder(line)
// In order to have the same behaviour as the JSON field extraction,
// the full line needs to be parsed to extract all possible matching fields.
pos := len(hints) // the index of the hint that matches
var res []byte
for !d.EOL() && d.ScanKeyval() {
if _, ok := l.allowedLabels[string(d.Key())]; ok {
return d.Value()
k := unsafe.String(unsafe.SliceData(d.Key()), len(d.Key()))
for x, hint := range hints {
if strings.EqualFold(k, hint) && x < pos {
res, pos = d.Value(), x
// If there is only a single hint, or the matching hint is the first one,
// we can stop parsing the rest of the line and return early.
if x == 0 {
return res
}
}
}
}
return nil
return res
}

func (l *LevelDetector) getValueUsingJSONParser(log []byte) []byte {
for allowedLabel := range l.allowedLabels {
l, _, _, err := jsonparser.Get(log, allowedLabel)
if err == nil {
return l
func getValueUsingJSONParser(line []byte, hints []string) []byte {
var res []byte
for _, allowedLabel := range hints {
parsed, err := jsonexpr.Parse(allowedLabel, false)
if err != nil {
continue
}
l, _, _, err := jsonparser.Get(line, log.JSONPathToStrings(parsed)...)
if err != nil {
continue
}
return l
}
return nil
return res
}

func isLogFmt(line []byte) bool {
Expand Down
Loading
Loading