Skip to content

Commit

Permalink
Merge branch 'main' into 2025.01.13_newClients
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Jan 14, 2025
2 parents 19417f3 + bcec63d commit 6d932ff
Show file tree
Hide file tree
Showing 13 changed files with 1,965 additions and 16 deletions.
14 changes: 11 additions & 3 deletions docs/sources/configure/bp-configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ One issue many people have with Loki is their client receiving errors for out of

There are a few things to dissect from that statement. The first is this restriction is per stream. Let’s look at an example:

```
```bash
{job="syslog"} 00:00:00 i'm a syslog!
{job="syslog"} 00:00:01 i'm a syslog!
```

If Loki received these two lines which are for the same stream, everything would be fine. But what about this case:

```
```bash
{job="syslog"} 00:00:00 i'm a syslog!
{job="syslog"} 00:00:02 i'm a syslog!
{job="syslog"} 00:00:01 i'm a syslog! <- Rejected out of order!
```
What can you do about this? What if this was because the sources of these logs were different systems? You can solve this with an additional label which is unique per system:
```
```bash
{job="syslog", instance="host1"} 00:00:00 i'm a syslog!
{job="syslog", instance="host1"} 00:00:02 i'm a syslog!
{job="syslog", instance="host2"} 00:00:01 i'm a syslog! <- Accepted, this is a new stream!
Expand All @@ -50,6 +50,14 @@ But what if the application itself generated logs that were out of order? Well,

It's also worth noting that the batching nature of the Loki push API can lead to some instances of out of order errors being received which are really false positives. (Perhaps a batch partially succeeded and was present; or anything that previously succeeded would return an out of order entry; or anything new would be accepted.)

## Use `snappy` compression algorithm

`Snappy` is currently the Loki compression algorithm of choice. It performs much better than `gzip` for speed, but it is not as efficient in storage. This was an acceptable tradeoff for us.

Grafana Labs has found that `gzip` was very good for compression but was very slow, and this was causing slow query responses.

`LZ4` is a good compromise of speed and compression performance. While compression is slightly slower than `snappy`, the compression ratio is higher, resulting in smaller chunks in object storage.

## Use `chunk_target_size`

Using `chunk_target_size` instructs Loki to try to fill all chunks to a target _compressed_ size of 1.5MB. These larger chunks are more efficient for Loki to process.
Expand Down
152 changes: 152 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package encoding

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for
// the given section.
func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset {
return &logsDataset{dec: dec, sec: sec}
}

type logsDataset struct {
dec LogsDecoder
sec *filemd.SectionInfo
}

func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
columns, err := ds.dec.Columns(ctx, ds.sec)
if err != nil {
return err
}

for _, column := range columns {
if !yield(&logsDatasetColumn{dec: ds.dec, desc: column}) {
return nil
}
}

return err
})

}

func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}

type logsDatasetColumn struct {
dec LogsDecoder
desc *logsmd.ColumnDesc

info *dataset.ColumnInfo
}

func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
if col.info != nil {
return col.info
}

col.info = &dataset.ColumnInfo{
Name: col.desc.Info.Name,
Type: col.desc.Info.ValueType,
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Statistics: col.desc.Info.Statistics,
}
return col.info
}

func (col *logsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] {
return result.Iter(func(yield func(dataset.Page) bool) error {
pageSets, err := result.Collect(col.dec.Pages(ctx, []*logsmd.ColumnDesc{col.desc}))
if err != nil {
return err
} else if len(pageSets) != 1 {
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets))
}

for _, page := range pageSets[0] {
if !yield(&logsDatasetPage{dec: col.dec, desc: page}) {
return nil
}
}

return nil
})
}

type logsDatasetPage struct {
dec LogsDecoder
desc *logsmd.PageDesc

info *dataset.PageInfo
}

func (p *logsDatasetPage) PageInfo() *dataset.PageInfo {
if p.info != nil {
return p.info
}

p.info = &dataset.PageInfo{
UncompressedSize: int(p.desc.Info.UncompressedSize),
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
}
return p.info
}

func (p *logsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) {
pages, err := result.Collect(p.dec.ReadPages(ctx, []*logsmd.PageDesc{p.desc}))
if err != nil {
return nil, err
} else if len(pages) != 1 {
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages))
}

return pages[0], nil
}
21 changes: 21 additions & 0 deletions pkg/dataobj/internal/encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand All @@ -20,6 +21,9 @@ type (

// StreamsDecoder returns a decoder for streams sections.
StreamsDecoder() StreamsDecoder

// LogsDecoder returns a decoder for logs sections.
LogsDecoder() LogsDecoder
}

// StreamsDecoder supports decoding data within a streams section.
Expand All @@ -38,4 +42,21 @@ type (
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData]
}

// LogsDecoder supports decoding data within a logs section.
LogsDecoder interface {
// Columns describes the set of columns in the provided section.
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error)

// Pages retrieves the set of pages for the provided columns. The order of
// page lists emitted by the sequence matches the order of columns
// provided: the first page list corresponds to the first column, and so
// on.
Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc]

// ReadPages reads the provided set of pages, iterating over their data
// matching the argument order. If an error is encountered while retrieving
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData]
}
)
28 changes: 27 additions & 1 deletion pkg/dataobj/internal/encoding/decoder_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
Expand All @@ -31,7 +32,7 @@ func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) {
return &md, nil
}

// decodeStreamsMetadata decodes stream section metadta from r.
// decodeStreamsMetadata decodes stream section metadata from r.
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
Expand All @@ -56,6 +57,31 @@ func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata,
return &metadata, nil
}

// decodeLogsMetadata decodes logs section metadata from r.
func decodeLogsMetadata(r streamio.Reader) (*logsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read streams section format version: %w", err)
} else if gotVersion != streamsFormatVersion {
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion)
}

var md logsmd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("streams section metadata: %w", err)
}
return &md, nil
}

// decodeLogsColumnMetadata decodes logs column metadata from r.
func decodeLogsColumnMetadata(r streamio.Reader) (*logsmd.ColumnMetadata, error) {
var metadata logsmd.ColumnMetadata
if err := decodeProto(r, &metadata); err != nil {
return nil, fmt.Errorf("streams column metadata: %w", err)
}
return &metadata, nil
}

// decodeProto decodes a proto message from r and stores it in pb. Proto
// messages are expected to be encoded with their size, followed by the proto
// bytes.
Expand Down
80 changes: 80 additions & 0 deletions pkg/dataobj/internal/encoding/decoder_readseeker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand Down Expand Up @@ -47,6 +48,10 @@ func (dec *readSeekerDecoder) StreamsDecoder() StreamsDecoder {
return &readSeekerStreamsDecoder{rs: dec.rs}
}

func (dec *readSeekerDecoder) LogsDecoder() LogsDecoder {
return &readSeekerLogsDecoder{rs: dec.rs}
}

type readSeekerStreamsDecoder struct {
rs io.ReadSeeker
}
Expand Down Expand Up @@ -121,3 +126,78 @@ func (dec *readSeekerStreamsDecoder) ReadPages(ctx context.Context, pages []*str
return nil
})
}

type readSeekerLogsDecoder struct {
rs io.ReadSeeker
}

func (dec *readSeekerLogsDecoder) Columns(_ context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) {
if section.Type != filemd.SECTION_TYPE_LOGS {
return nil, fmt.Errorf("unexpected section type: got=%d want=%d", section.Type, filemd.SECTION_TYPE_LOGS)
}

if _, err := dec.rs.Seek(int64(section.MetadataOffset), io.SeekStart); err != nil {
return nil, fmt.Errorf("seek to streams metadata: %w", err)
}
r := bufio.NewReader(io.LimitReader(dec.rs, int64(section.MetadataSize)))

md, err := decodeLogsMetadata(r)
if err != nil {
return nil, err
}
return md.Columns, nil
}

func (dec *readSeekerLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] {
getPages := func(_ context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) {
if _, err := dec.rs.Seek(int64(column.Info.MetadataOffset), io.SeekStart); err != nil {
return nil, fmt.Errorf("seek to column metadata: %w", err)
}
r := bufio.NewReader(io.LimitReader(dec.rs, int64(column.Info.MetadataSize)))

md, err := decodeLogsColumnMetadata(r)
if err != nil {
return nil, err
}
return md.Pages, nil
}

return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (dec *readSeekerLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(_ context.Context, page *logsmd.PageDesc) (dataset.PageData, error) {
if _, err := dec.rs.Seek(int64(page.Info.DataOffset), io.SeekStart); err != nil {
return nil, err
}
data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(dec.rs, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}

return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}
Loading

0 comments on commit 6d932ff

Please sign in to comment.