Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dataobj): logs section metadata and encoding/decoding #15720

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions pkg/dataobj/internal/encoding/dataset_logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package encoding

import (
"context"
"fmt"

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// LogsDataset implements returns a [dataset.Dataset] from a [LogsDecoder] for
// the given section.
func LogsDataset(dec LogsDecoder, sec *filemd.SectionInfo) dataset.Dataset {
return &logsDataset{dec: dec, sec: sec}
}

type logsDataset struct {
dec LogsDecoder
sec *filemd.SectionInfo
}

func (ds *logsDataset) ListColumns(ctx context.Context) result.Seq[dataset.Column] {
return result.Iter(func(yield func(dataset.Column) bool) error {
columns, err := ds.dec.Columns(ctx, ds.sec)
if err != nil {
return err
}

for _, column := range columns {
if !yield(&logsDatasetColumn{dec: ds.dec, desc: column}) {
return nil
}
}

return err
})

}

func (ds *logsDataset) ListPages(ctx context.Context, columns []dataset.Column) result.Seq[dataset.Pages] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each column.
return result.Iter(func(yield func(dataset.Pages) bool) error {
for _, column := range columns {
pages, err := result.Collect(column.ListPages(ctx))
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (ds *logsDataset) ReadPages(ctx context.Context, pages []dataset.Page) result.Seq[dataset.PageData] {
// TODO(rfratto): Switch to batch retrieval instead of iterating over each page.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := page.ReadPage(ctx)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}

type logsDatasetColumn struct {
dec LogsDecoder
desc *logsmd.ColumnDesc

info *dataset.ColumnInfo
}

func (col *logsDatasetColumn) ColumnInfo() *dataset.ColumnInfo {
if col.info != nil {
return col.info
}

col.info = &dataset.ColumnInfo{
Name: col.desc.Info.Name,
Type: col.desc.Info.ValueType,
Compression: col.desc.Info.Compression,

RowsCount: int(col.desc.Info.RowsCount),
CompressedSize: int(col.desc.Info.CompressedSize),
UncompressedSize: int(col.desc.Info.UncompressedSize),

Statistics: col.desc.Info.Statistics,
}
return col.info
}

func (col *logsDatasetColumn) ListPages(ctx context.Context) result.Seq[dataset.Page] {
return result.Iter(func(yield func(dataset.Page) bool) error {
pageSets, err := result.Collect(col.dec.Pages(ctx, []*logsmd.ColumnDesc{col.desc}))
if err != nil {
return err
} else if len(pageSets) != 1 {
return fmt.Errorf("unexpected number of page sets: got=%d want=1", len(pageSets))
}

for _, page := range pageSets[0] {
if !yield(&logsDatasetPage{dec: col.dec, desc: page}) {
return nil
}
}

return nil
})
}

type logsDatasetPage struct {
dec LogsDecoder
desc *logsmd.PageDesc

info *dataset.PageInfo
}

func (p *logsDatasetPage) PageInfo() *dataset.PageInfo {
if p.info != nil {
return p.info
}

p.info = &dataset.PageInfo{
UncompressedSize: int(p.desc.Info.UncompressedSize),
CompressedSize: int(p.desc.Info.CompressedSize),
CRC32: p.desc.Info.Crc32,
RowCount: int(p.desc.Info.RowsCount),

Encoding: p.desc.Info.Encoding,
Stats: p.desc.Info.Statistics,
}
return p.info
}

func (p *logsDatasetPage) ReadPage(ctx context.Context) (dataset.PageData, error) {
pages, err := result.Collect(p.dec.ReadPages(ctx, []*logsmd.PageDesc{p.desc}))
if err != nil {
return nil, err
} else if len(pages) != 1 {
return nil, fmt.Errorf("unexpected number of pages: got=%d want=1", len(pages))
}

return pages[0], nil
}
21 changes: 21 additions & 0 deletions pkg/dataobj/internal/encoding/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand All @@ -20,6 +21,9 @@ type (

// StreamsDecoder returns a decoder for streams sections.
StreamsDecoder() StreamsDecoder

// LogsDecoder returns a decoder for logs sections.
LogsDecoder() LogsDecoder
}

// StreamsDecoder supports decoding data within a streams section.
Expand All @@ -38,4 +42,21 @@ type (
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData]
}

// LogsDecoder supports decoding data within a logs section.
LogsDecoder interface {
// Columns describes the set of columns in the provided section.
Columns(ctx context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error)

// Pages retrieves the set of pages for the provided columns. The order of
// page lists emitted by the sequence matches the order of columns
// provided: the first page list corresponds to the first column, and so
// on.
Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc]

// ReadPages reads the provided set of pages, iterating over their data
// matching the argument order. If an error is encountered while retrieving
// pages, an error is emitted and iteration stops.
ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData]
}
)
28 changes: 27 additions & 1 deletion pkg/dataobj/internal/encoding/decoder_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gogo/protobuf/proto"

"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
)
Expand All @@ -31,7 +32,7 @@ func decodeFileMetadata(r streamio.Reader) (*filemd.Metadata, error) {
return &md, nil
}

// decodeStreamsMetadata decodes stream section metadta from r.
// decodeStreamsMetadata decodes stream section metadata from r.
func decodeStreamsMetadata(r streamio.Reader) (*streamsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
Expand All @@ -56,6 +57,31 @@ func decodeStreamsColumnMetadata(r streamio.Reader) (*streamsmd.ColumnMetadata,
return &metadata, nil
}

// decodeLogsMetadata decodes logs section metadata from r.
func decodeLogsMetadata(r streamio.Reader) (*logsmd.Metadata, error) {
gotVersion, err := streamio.ReadUvarint(r)
if err != nil {
return nil, fmt.Errorf("read streams section format version: %w", err)
} else if gotVersion != streamsFormatVersion {
return nil, fmt.Errorf("unexpected streams section format version: got=%d want=%d", gotVersion, streamsFormatVersion)
}

var md logsmd.Metadata
if err := decodeProto(r, &md); err != nil {
return nil, fmt.Errorf("streams section metadata: %w", err)
}
return &md, nil
}

// decodeLogsColumnMetadata decodes logs column metadata from r.
func decodeLogsColumnMetadata(r streamio.Reader) (*logsmd.ColumnMetadata, error) {
var metadata logsmd.ColumnMetadata
if err := decodeProto(r, &metadata); err != nil {
return nil, fmt.Errorf("streams column metadata: %w", err)
}
return &metadata, nil
}

// decodeProto decodes a proto message from r and stores it in pb. Proto
// messages are expected to be encoded with their size, followed by the proto
// bytes.
Expand Down
80 changes: 80 additions & 0 deletions pkg/dataobj/internal/encoding/decoder_readseeker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
Expand Down Expand Up @@ -47,6 +48,10 @@ func (dec *readSeekerDecoder) StreamsDecoder() StreamsDecoder {
return &readSeekerStreamsDecoder{rs: dec.rs}
}

func (dec *readSeekerDecoder) LogsDecoder() LogsDecoder {
return &readSeekerLogsDecoder{rs: dec.rs}
}

type readSeekerStreamsDecoder struct {
rs io.ReadSeeker
}
Expand Down Expand Up @@ -121,3 +126,78 @@ func (dec *readSeekerStreamsDecoder) ReadPages(ctx context.Context, pages []*str
return nil
})
}

type readSeekerLogsDecoder struct {
rs io.ReadSeeker
}

func (dec *readSeekerLogsDecoder) Columns(_ context.Context, section *filemd.SectionInfo) ([]*logsmd.ColumnDesc, error) {
if section.Type != filemd.SECTION_TYPE_LOGS {
return nil, fmt.Errorf("unexpected section type: got=%d want=%d", section.Type, filemd.SECTION_TYPE_LOGS)
}

if _, err := dec.rs.Seek(int64(section.MetadataOffset), io.SeekStart); err != nil {
return nil, fmt.Errorf("seek to streams metadata: %w", err)
}
r := bufio.NewReader(io.LimitReader(dec.rs, int64(section.MetadataSize)))

md, err := decodeLogsMetadata(r)
if err != nil {
return nil, err
}
return md.Columns, nil
}

func (dec *readSeekerLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] {
getPages := func(_ context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) {
if _, err := dec.rs.Seek(int64(column.Info.MetadataOffset), io.SeekStart); err != nil {
return nil, fmt.Errorf("seek to column metadata: %w", err)
}
r := bufio.NewReader(io.LimitReader(dec.rs, int64(column.Info.MetadataSize)))

md, err := decodeLogsColumnMetadata(r)
if err != nil {
return nil, err
}
return md.Pages, nil
}

return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
if err != nil {
return err
} else if !yield(pages) {
return nil
}
}

return nil
})
}

func (dec *readSeekerLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(_ context.Context, page *logsmd.PageDesc) (dataset.PageData, error) {
if _, err := dec.rs.Seek(int64(page.Info.DataOffset), io.SeekStart); err != nil {
return nil, err
}
data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(dec.rs, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}

return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
if err != nil {
return err
} else if !yield(data) {
return nil
}
}

return nil
})
}
21 changes: 20 additions & 1 deletion pkg/dataobj/internal/encoding/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewEncoder(w streamio.Writer) *Encoder {
}
}

// OpenStreams opens a [StreamsEncoder]. OpenSterams fails if there is another
// OpenStreams opens a [StreamsEncoder]. OpenStreams fails if there is another
// open section.
func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) {
if enc.curSection != nil {
Expand All @@ -66,6 +66,25 @@ func (enc *Encoder) OpenStreams() (*StreamsEncoder, error) {
), nil
}

// OpenLogs opens a [LogsEncoder]. OpenLogs fails if there is another open
// section.
func (enc *Encoder) OpenLogs() (*LogsEncoder, error) {
if enc.curSection != nil {
return nil, ErrElementExist
}

enc.curSection = &filemd.SectionInfo{
Type: filemd.SECTION_TYPE_LOGS,
MetadataOffset: math.MaxUint32,
MetadataSize: math.MaxUint32,
}

return newLogsEncoder(
enc,
enc.startOffset+enc.data.Len(),
), nil
}

// MetadataSize returns an estimate of the current size of the metadata for the
// data object. MetadataSize does not include the size of data appended. The
// estimate includes the currently open element.
Expand Down
Loading
Loading