Skip to content

Commit

Permalink
Merge branch 'main' into patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
JStickler authored Jan 13, 2025
2 parents 162eb84 + e838ee3 commit 10b0487
Show file tree
Hide file tree
Showing 20 changed files with 4,435 additions and 109 deletions.
2 changes: 1 addition & 1 deletion docs/sources/send-data/docker-driver/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ docker plugin install grafana/loki-docker-driver:3.3.2-arm64 --alias loki --gran
```

{{< admonition type="note" >}}
Add `-arm64` to the image tag for AMR64 hosts.
Add `-arm64` to the image tag for ARM64 hosts.
{{< /admonition >}}

To check installed plugins, use the `docker plugin ls` command.
Expand Down
6 changes: 5 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4988,6 +4988,9 @@ remote_write:
# Configure remote write clients. A map with remote client id as key. For
# details, see
# https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write
# Specifying a header with key 'X-Scope-OrgID' under the 'headers' section of
# RemoteWriteConfig is not permitted. If specified, it will be dropped during
# config parsing.
[clients: <map of string to RemoteWriteConfig>]
# Enable remote-write functionality.
Expand All @@ -5000,7 +5003,8 @@ remote_write:
# CLI flag: -ruler.remote-write.config-refresh-period
[config_refresh_period: <duration> | default = 10s]
# Add X-Scope-OrgID header in remote write requests.
# Add an X-Scope-OrgID header in remote write requests with the tenant ID of a
# Loki tenant that the recording rules are part of.
# CLI flag: -ruler.remote-write.add-org-id-header
[add_org_id_header: <boolean> | default = true]
Expand Down
50 changes: 50 additions & 0 deletions pkg/dataobj/dataobj.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Package dataobj holds utilities for working with data objects.
package dataobj

import (
"context"

"github.com/thanos-io/objstore"

"github.com/grafana/loki/pkg/push"
)

// BuilderConfig configures a data object [Builder].
type BuilderConfig struct{}

// A Builder builds data objects from a set of incoming log data. Log data is
// appended to a builder by calling [Builder.Append]. Buffered log data is
// flushed manually by calling [Builder.Flush].
//
// Methods on Builder are not goroutine-safe; callers are responsible for
// synchronizing calls.
type Builder struct{}

// NewBuilder creates a new Builder which stores data objects for the specified
// tenant in a bucket.
func NewBuilder(cfg BuilderConfig, bucket objstore.Bucket, tenantID string) *Builder {
// TODO(rfratto): implement
_ = cfg
_ = bucket
_ = tenantID
return &Builder{}
}

// Append buffers an entry to be written to a data object. If the Builder is
// full, Append returns false without appending the entry.
//
// Once a Builder is full, call [Builder.Flush] to flush the buffered data,
// then call Append again with the same entry.
func (b *Builder) Append(entry push.PushRequest) bool {
// TODO(rfratto): implement
_ = entry
return true
}

// Flush flushes all buffered data to object storage. Calling Flush be result
// in a no-op if there is no buffered data to flush.
func (b *Builder) Flush(ctx context.Context) error {
// TODO(rfratto): implement
_ = ctx
return nil
}
152 changes: 152 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package encoding

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// StreamsDataset implements returns a [dataset.Dataset] from a
// [StreamsDecoder] for the given section.
func StreamsDataset(dec StreamsDecoder, sec *filemd.SectionInfo) dataset.Dataset {
return &streamsDataset{dec: dec, sec: sec}
}

type streamsDataset struct {
dec StreamsDecoder
sec *filemd.SectionInfo
}

func (ds *streamsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
columns, err := ds.dec.Columns(ctx, ds.sec)
if err != nil {
return err
}

for _, column := range columns {
if !yield(&streamsDatasetColumn{dec: ds.dec, desc: column}) {
return nil
}
}

return err
})

}

func (ds *streamsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (ds *streamsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}

type streamsDatasetColumn struct {
dec StreamsDecoder
desc *streamsmd.ColumnDesc

info *dataset.ColumnInfo
}

func (col *streamsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
if col.info != nil {
return col.info
}

col.info = &dataset.ColumnInfo{
Name: col.desc.Info.Name,
Type: col.desc.Info.ValueType,
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Statistics: col.desc.Info.Statistics,
}
return col.info
}

func (col *streamsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] {
return result.Iter(func(yield func(dataset.Page) bool) error {
pageSets, err := result.Collect(col.dec.Pages(ctx, []*streamsmd.ColumnDesc{col.desc}))
if err != nil {
return err
} else if len(pageSets) != 1 {
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets))
}

for _, page := range pageSets[0] {
if !yield(&streamsDatasetPage{dec: col.dec, desc: page}) {
return nil
}
}

return nil
})
}

type streamsDatasetPage struct {
dec StreamsDecoder
desc *streamsmd.PageDesc

info *dataset.PageInfo
}

func (p *streamsDatasetPage) PageInfo() *dataset.PageInfo {
if p.info != nil {
return p.info
}

p.info = &dataset.PageInfo{
UncompressedSize: int(p.desc.Info.UncompressedSize),
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
}
return p.info
}

func (p *streamsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) {
pages, err := result.Collect(p.dec.ReadPages(ctx, []*streamsmd.PageDesc{p.desc}))
if err != nil {
return nil, err
} else if len(pages) != 1 {
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages))
}

return pages[0], nil
}
41 changes: 41 additions & 0 deletions pkg/dataobj/internal/encoding/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package encoding

import (
"context"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// Decoders. To cleanly separate the APIs per section, section-specific
// Decoders should be created and returned by the top-level [Decoder]
// interface.
type (
// A Decoder decodes a data object.
Decoder interface {
// Sections returns the list of sections within a data object.
Sections(ctx context.Context) ([]*filemd.SectionInfo, error)

// StreamsDecoder returns a decoder for streams sections.
StreamsDecoder() StreamsDecoder
}

// StreamsDecoder supports decoding data within a streams section.
StreamsDecoder interface {
// Columns describes the set of columns in the provided section.
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*streamsmd.ColumnDesc, error)

// Pages retrieves the set of pages for the provided columns. The order of
// page lists emitted by the sequence matches the order of columns
// provided: the first page list corresponds to the first column, and so
// on.
Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc]

// ReadPages reads the provided set of pages, iterating over their data
// matching the argument order. If an error is encountered while retrieving
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData]
}
)
83 changes: 83 additions & 0 deletions pkg/dataobj/internal/encoding/decoder_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package encoding

import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/gogo/protobuf/proto"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)

// decode* methods for metadata shared by Decoder implementations.

// decodeFileMetadata decodes file metadata from r.
func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read file format version: %w", err)
} else if gotVersion != fileFormatVersion {
return nil, fmt.Errorf("unexpected file format version: got=%d want=%d", gotVersion, fileFormatVersion)
}

var md filemd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("file metadata: %w", err)
}
return &md, nil
}

// decodeStreamsMetadata decodes stream section metadta from r.
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read streams section format version: %w", err)
} else if gotVersion != streamsFormatVersion {
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion)
}

var md streamsmd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("streams section metadata: %w", err)
}
return &md, nil
}

// decodeStreamsColumnMetadata decodes stream column metadata from r.
func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata, error) {
var metadata streamsmd.ColumnMetadata
if err := decodeProto(r, &metadata); err != nil {
return nil, fmt.Errorf("streams column metadata: %w", err)
}
return &metadata, nil
}

// decodeProto decodes a proto message from r and stores it in pb. Proto
// messages are expected to be encoded with their size, followed by the proto
// bytes.
func decodeProto(r streamio.Reader, pb proto.Message) error {
size, err := binary.ReadUvarint(r)
if err != nil {
return fmt.Errorf("read proto message size: %w", err)
}

buf := bytesBufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bytesBufferPool.Put(buf)

n, err := io.Copy(buf, io.LimitReader(r, int64(size)))
if err != nil {
return fmt.Errorf("read proto message: %w", err)
} else if uint64(n) != size {
return fmt.Errorf("read proto message: got=%d want=%d", n, size)
}

if err := proto.Unmarshal(buf.Bytes(), pb); err != nil {
return fmt.Errorf("unmarshal proto message: %w", err)
}
return nil
}
Loading

0 comments on commit 10b0487

Please sign in to comment.