Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update chunk and head fmt to v4 (non-indexed labels) #10146

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
chunkFormatV3
chunkFormatV4

DefaultChunkFormat = chunkFormatV3 // the currently used chunk format
DefaultChunkFormat = chunkFormatV4 // the currently used chunk format

blocksPerChunk = 10
maxLineLength = 1024 * 1024 * 1024
Expand Down Expand Up @@ -85,7 +85,7 @@ const (
UnorderedHeadBlockFmt
UnorderedWithNonIndexedLabelsHeadBlockFmt

DefaultHeadBlockFmt = UnorderedHeadBlockFmt
DefaultHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt
)

var magicNumber = uint32(0x12EE56A)
Expand Down Expand Up @@ -348,8 +348,19 @@ func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *Me
return newMemChunkWithFormat(DefaultChunkFormat, enc, head, blockSize, targetSize)
}

func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) {
if chunkFmt == chunkFormatV2 && head != OrderedHeadBlockFmt {
panic("only OrderedHeadBlockFmt is supported for V2 chunks")
}
if chunkFmt == chunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt {
panic("only UnorderedWithNonIndexedLabelsHeadBlockFmt is supported for V4 chunks")
}
}

// NewMemChunk returns a new in-mem chunk.
func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
panicIfInvalidFormat(format, head)

symbolizer := newSymbolizer()
return &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
Expand Down Expand Up @@ -1080,24 +1091,16 @@ func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, err
return nil, err
}

// If the head format is not explicitly set, use the default.
// This will be the most common case for chunks read from storage since
// they have a dummy head block.
headFmt := c.headFmt
if headFmt < OrderedHeadBlockFmt {
headFmt = DefaultHeadBlockFmt
}

var newChunk *MemChunk
// as close as possible, respect the block/target sizes specified. However,
// if the blockSize is not set, use reasonable defaults.
if c.blockSize > 0 {
newChunk = NewMemChunk(c.Encoding(), headFmt, c.blockSize, c.targetSize)
newChunk = NewMemChunk(c.Encoding(), DefaultHeadBlockFmt, c.blockSize, c.targetSize)
} else {
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk = NewMemChunk(c.Encoding(), headFmt, defaultBlockSize, c.CompressedSize())
newChunk = NewMemChunk(c.Encoding(), DefaultHeadBlockFmt, defaultBlockSize, c.CompressedSize())
}

for itr.Next() {
Expand Down
13 changes: 7 additions & 6 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var (
}
)

const DefaultTestHeadBlockFmt = OrderedHeadBlockFmt
const DefaultTestHeadBlockFmt = DefaultHeadBlockFmt

func TestBlocksInclusive(t *testing.T) {
chk := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize)
Expand Down Expand Up @@ -637,11 +637,11 @@ func TestChunkSize(t *testing.T) {
}
var result []res
for _, bs := range testBlockSizes {
for _, f := range HeadBlockFmts {
for _, f := range allPossibleFormats {
for _, enc := range testEncoding {
name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs)))
t.Run(name, func(t *testing.T) {
c := NewMemChunk(enc, f, bs, testTargetSize)
c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, bs, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
Expand Down Expand Up @@ -685,7 +685,8 @@ func TestChunkStats(t *testing.T) {
inserted++
entry.Timestamp = entry.Timestamp.Add(time.Nanosecond)
}
expectedSize := (inserted * len(entry.Line)) + (inserted * 2 * binary.MaxVarintLen64)
// For each entry: timestamp <varint>, line size <varint>, line <bytes>, num of non-indexed labels <varint>
expectedSize := inserted * (len(entry.Line) + 3*binary.MaxVarintLen64)
statsCtx, ctx := stats.NewContext(context.Background())

it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline)
Expand Down Expand Up @@ -734,7 +735,7 @@ func TestChunkStats(t *testing.T) {
}

func TestIteratorClose(t *testing.T) {
for _, f := range HeadBlockFmts {
for _, f := range allPossibleFormats {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
for _, test := range []func(iter iter.EntryIterator, t *testing.T){
Expand Down Expand Up @@ -762,7 +763,7 @@ func TestIteratorClose(t *testing.T) {
}
},
} {
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/chunkenc/unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
}

func TestUnorderedChunkIterators(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
require.Nil(t, c.Append(&logproto.Entry{
Expand Down Expand Up @@ -546,7 +546,7 @@ func BenchmarkUnorderedRead(b *testing.B) {
}

func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)

ct := 0
Expand Down Expand Up @@ -583,7 +583,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
}

func chunkFrom(xs []logproto.Entry) ([]byte, error) {
c := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if err := c.Append(&x); err != nil {
return nil, err
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestReorder(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
require.Nil(t, c.Append(&x))
}
Expand All @@ -660,7 +660,7 @@ func TestReorder(t *testing.T) {
}

func TestReorderAcrossBlocks(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestIterator(t *testing.T) {
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
Expand Down
139 changes: 65 additions & 74 deletions pkg/ingester/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,95 +38,86 @@ func dummyConf() *Config {
}

func Test_EncodingChunks(t *testing.T) {
for _, f := range chunkenc.HeadBlockFmts {
for _, close := range []bool{true, false} {
for _, tc := range []struct {
desc string
conf Config
}{
{
// mostly for historical parity
desc: "dummyConf",
conf: *dummyConf(),
},
{
desc: "default",
conf: defaultIngesterTestConfig(t),
},
} {

t.Run(fmt.Sprintf("%v-%v-%s", f, close, tc.desc), func(t *testing.T) {
conf := tc.conf
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, f, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
for _, close := range []bool{true, false} {
for _, tc := range []struct {
desc string
conf Config
}{
{
// mostly for historical parity
desc: "dummyConf",
conf: *dummyConf(),
},
{
desc: "default",
conf: defaultIngesterTestConfig(t),
},
} {

t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) {
conf := tc.conf
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, f, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
if close {
require.Nil(t, c.Close())
}

from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)

// Ensure closed head chunks only contain the head metadata but no entries
if close {
require.Nil(t, c.Close())
const unorderedHeadSize = 2
require.Equal(t, unorderedHeadSize, len(c.Head))
} else {
require.Greater(t, len(c.Head), 0)
}
}

from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)

// Ensure closed head chunks only contain the head metadata but no entries
if close {
if f < chunkenc.UnorderedHeadBlockFmt {
// format + #entries + size + mint + maxt
const orderedHeadSize = 5
require.Equal(t, orderedHeadSize, len(c.Head))
} else {
// format + #lines
const unorderedHeadSize = 2
require.Equal(t, unorderedHeadSize, len(c.Head))
}
} else {
require.Greater(t, len(c.Head), 0)
}
}
backAgain, err := fromWireChunks(&conf, chunks)
require.Nil(t, err)

backAgain, err := fromWireChunks(&conf, chunks)
for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil

for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil

matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil
matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil

require.Equal(t, exp, enc)
require.Equal(t, matched, to)
require.Equal(t, exp, enc)
require.Equal(t, matched, to)

}
}

})
}
})
}
}
}

func Test_EncodingCheckpoint(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
require.Nil(t, c.Append(&logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "hi there",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc {
for i := range res {
res[i] = &chunkDesc{
closed: true,
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.DefaultHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestStreamIterator(t *testing.T) {
new func() *chunkenc.MemChunk
}{
{"gzipChunk", func() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/wal/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
// The current type of Entries that this distribution writes.
// Loki can read in a backwards compatible manner, but will write the newest variant.
// TODO: Change to WALRecordEntriesV3?
const CurrentEntriesRec = WALRecordEntriesV2
const CurrentEntriesRec = WALRecordEntriesV3

// Record is a struct combining the series and samples record.
type Record struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func RecordRangeAndInstantQueryMetrics(
"returned_lines", returnedLines,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_bytes_non_indexed_labels", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalNonIndexedLabelsBytesProcessed)), " ", "", 1),
"lines_per_second", stats.Summary.LinesProcessedPerSecond,
"total_lines", stats.Summary.TotalLinesProcessed,
"post_filter_lines", stats.Summary.TotalPostFilterLines,
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func fillStore(cm storage.ClientMetrics) error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.DefaultHeadBlockFmt, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
Expand All @@ -114,7 +114,7 @@ func fillStore(cm storage.ClientMetrics) error {
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864)
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.DefaultHeadBlockFmt, 262144, 1572864)
}
}
}(i)
Expand Down
Loading
Loading