diff --git a/cmd/streams-compactor/streams-compactor.go b/cmd/streams-compactor/streams-compactor.go new file mode 100644 index 0000000000000..91c11a85f8f13 --- /dev/null +++ b/cmd/streams-compactor/streams-compactor.go @@ -0,0 +1,231 @@ +package main + +import ( + "context" + "encoding/json" + "math" + "os" + "sort" + "time" + + "github.com/grafana/dskit/concurrency" + "github.com/pkg/errors" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" +) + +var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + +const ( + resultsFileLocation = "path to json file with results from streams-inspector" + desiredChunkSizeKB = 6 * 1024 +) + +func main() { + stats := IndexStats{} + level.Info(logger).Log("msg", "reading file content") + content, err := os.ReadFile(resultsFileLocation) + if err != nil { + level.Error(logger).Log("msg", "error while reading the file", "err", err) + return + } + level.Info(logger).Log("msg", "unmarshalling the json") + err = json.Unmarshal(content, &stats) + if err != nil { + level.Error(logger).Log("msg", "error while unmarshalling", "err", err) + return + } + + err = assertCorrect(stats) + if err != nil { + level.Error(logger).Log("msg", "results are incorrect. can not continue", "err", err) + return + } + + level.Info(logger).Log("msg", "results are correct") + computeMetrics(&stats) + printStats(stats, "before compaction") + level.Info(logger).Log("msg", "start compaction") + compacted, err := compact(stats, desiredChunkSizeKB) + if err != nil { + level.Error(logger).Log("msg", "error while compacting", "err", err) + return + } + level.Info(logger).Log("msg", "complete compaction") + computeMetrics(&compacted) + printStats(compacted, "after compaction") +} + +func assertCorrect(stats IndexStats) error { + if len(stats.Streams) != int(stats.StreamsCount) { + level.Error(logger).Log("msg", "streams count mismatch", "streams_count", stats.StreamsCount, "actual_streams_count", len(stats.Streams)) + return errors.New("streams count mismatch") + } + actualSizeKB := uint32(0) + actualChunksCount := uint32(0) + for _, stream := range stats.Streams { + actualChunksCount += uint32(len(stream.Chunks)) + for _, chunk := range stream.Chunks { + actualSizeKB += chunk.SizeKB + } + } + if actualSizeKB != stats.SizeKB { + level.Error(logger).Log("msg", "sizeKB mismatch", "size_kb", stats.SizeKB, "actual_size_kb", actualSizeKB) + return errors.New("sizeKB mismatch") + } + + if actualChunksCount != stats.ChunksCount { + level.Error(logger).Log("msg", "chunks count mismatch", "chunks_count", stats.ChunksCount, "actual_chunks_count", actualChunksCount) + return errors.New("chunks count mismatch") + } + return nil +} + +//func truncate(stats IndexStats, content []byte, err error) { +// updated := stats.Streams[0:20] +// sizeKb := uint32(0) +// chunksCount := uint32(0) +// for _, streamStats := range updated { +// chunksCount += streamStats.ChunksCount +// for _, chunk := range streamStats.Chunks { +// sizeKb += chunk.SizeKB +// } +// } +// truncated := IndexStats{ +// Streams: updated, +// StreamsCount: 20, +// SizeKB: sizeKb, +// ChunksCount: chunksCount, +// } +// content, err = json.MarshalIndent(truncated, " ", " ") +// if err != nil { +// panic(err) +// return +// } +// err = os.WriteFile("path to truncated file", content, 0644) +// if err != nil { +// panic(err) +// return +// } +//} + +func printStats(stats IndexStats, msg string) { + level.Info(logger).Log( + "msg", msg, + "streams_count", stats.StreamsCount, + "chunks_count", stats.ChunksCount, + "size_kb", stats.SizeKB, + "chunk_avg_size_kb", stats.ChunkAvgSizeKB, + "chunk_med_size_kb", stats.ChunkMedSizeKB, + "min_chunk_size_kb", stats.MinChunkSizeKB, + "max_chunk_size_kb", stats.MaxChunkSizeKB, + ) +} + +func compact(stats IndexStats, desiredChunkSizeKB uint32) (IndexStats, error) { + compactedStreamStats := make([]StreamStats, len(stats.Streams)) + err := concurrency.ForEachJob(context.Background(), len(stats.Streams), 100, func(ctx context.Context, idx int) error { + stream := stats.Streams[idx] + compactedStream := StreamStats{Stream: stream.Stream, Chunks: make([]ChunkStats, 0, len(stream.Chunks))} + lastNotCompactedChunkIdx := -1 + currentCompactedChunkSizeKb := uint32(0) + for i := 0; i < len(stream.Chunks); i++ { + currentChunk := stream.Chunks[i] + // if we reach desired size + if currentChunk.SizeKB+currentCompactedChunkSizeKb > desiredChunkSizeKB || + // or if it's the last chunk + i == len(stream.Chunks)-1 || + // or if the size of the next chunk is greater than desired size + stream.Chunks[i+1].SizeKB > desiredChunkSizeKB { + compactedChunkStart := currentChunk.Start + compactedChunkEnd := currentChunk.End + if lastNotCompactedChunkIdx > -1 { + if stream.Chunks[lastNotCompactedChunkIdx].Start.Before(currentChunk.Start) { + compactedChunkStart = stream.Chunks[lastNotCompactedChunkIdx].Start + } + if stream.Chunks[lastNotCompactedChunkIdx].End.After(currentChunk.End) { + compactedChunkEnd = stream.Chunks[lastNotCompactedChunkIdx].End + } + } + compactedStream.Chunks = append(compactedStream.Chunks, ChunkStats{ + Start: compactedChunkStart, + End: compactedChunkEnd, + SizeKB: currentChunk.SizeKB + currentCompactedChunkSizeKb, + }) + compactedStream.ChunksCount++ + compactedStream.SizeKB += currentChunk.SizeKB + currentCompactedChunkSizeKb + //reset + lastNotCompactedChunkIdx = -1 + currentCompactedChunkSizeKb = 0 + } else { + currentCompactedChunkSizeKb += currentChunk.SizeKB + lastNotCompactedChunkIdx = i + } + } + compactedStreamStats[idx] = compactedStream + return nil + }) + if err != nil { + return IndexStats{}, errors.Wrap(err, "error while compacting the streams") + } + compacted := IndexStats{Streams: compactedStreamStats, StreamsCount: uint32(len(compactedStreamStats))} + for _, sst := range compactedStreamStats { + compacted.ChunksCount += sst.ChunksCount + compacted.SizeKB += sst.SizeKB + } + return compacted, nil +} + +func computeMetrics(stats *IndexStats) { + chunksCountZeroKB := 0 + chunksCount := int(stats.ChunksCount) + stats.ChunkAvgSizeKB = stats.SizeKB / stats.ChunksCount + sizesKB := make([]int, 0, chunksCount) + min := uint32(math.MaxUint32) + max := uint32(0) + for _, stream := range stats.Streams { + for _, chunk := range stream.Chunks { + if chunk.SizeKB < min { + min = chunk.SizeKB + } + if chunk.SizeKB > max { + max = chunk.SizeKB + } + if chunk.SizeKB == 0 { + chunksCountZeroKB++ + } + sizesKB = append(sizesKB, int(chunk.SizeKB)) + } + } + level.Info(logger).Log("msg", "chunks with ZERO size", "count", chunksCountZeroKB) + sort.Ints(sizesKB) + stats.ChunkMedSizeKB = uint32(sizesKB[len(sizesKB)/2]) + stats.MinChunkSizeKB = min + stats.MaxChunkSizeKB = max +} + +type ChunkStats struct { + Start time.Time + End time.Time + SizeKB uint32 +} + +type StreamStats struct { + Stream string + Chunks []ChunkStats + ChunksCount uint32 + SizeKB uint32 +} + +type IndexStats struct { + Streams []StreamStats + StreamsCount uint32 + ChunksCount uint32 + SizeKB uint32 + //computed + ChunkAvgSizeKB uint32 + ChunkMedSizeKB uint32 + MinChunkSizeKB uint32 + MaxChunkSizeKB uint32 +} diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go new file mode 100644 index 0000000000000..e24888f9135b2 --- /dev/null +++ b/cmd/streams-inspect/streams-inspector.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" + stream_inspector "github.com/grafana/loki/pkg/stream-inspector" + "math" + "os" + "strings" + "time" + + "github.com/grafana/loki/pkg/storage/stores/tsdb/index" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/storage/stores/tsdb" +) + +const ( + daySinceUnixEpoc = 19610 + indexLocation = "" + resultFilePath = "" +) + +var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + +func main() { + ctx := context.Background() + + idx, _, err := tsdb.NewTSDBIndexFromFile(indexLocation, tsdb.IndexOpts{}) + + if err != nil { + level.Error(logger).Log("msg", "can not read index file", "err", err) + return + } + //first 6 hours + start := time.Unix(0, 0).UTC().AddDate(0, 0, daySinceUnixEpoc) + end := start.AddDate(0, 0, 1).Add(-18 * time.Hour) + var streamMatchers []*labels.Matcher + //streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} + limit := int32(5000) + + leftTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers) + if err != nil { + fmt.Println("err", err) + return + } + //last 6 hours + start = start.Add(18 * time.Hour) + end = start.Add(6 * time.Hour) + rightTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers) + if err != nil { + fmt.Println("err", err) + return + } + + level.Info(logger).Log("msg", "starting building flamegraph model") + converter := stream_inspector.FlamegraphConverter{Left: leftTrees, Right: rightTrees, Mode: stream_inspector.Diff} + flameBearer := converter.CovertTrees() + level.Info(logger).Log("msg", "completed building flamegraph model") + + level.Info(logger).Log("msg", "starting writing json") + content, err := json.Marshal(flameBearer) + if err != nil { + panic(err) + return + } + _, err = os.Stat(resultFilePath) + if err == nil { + level.Info(logger).Log("msg", "results file already exists. deleting previous one.") + err := os.Remove(resultFilePath) + if err != nil { + panic(err) + return + } + } + err = os.WriteFile(resultFilePath, content, 0644) + if err != nil { + panic(err) + return + } +} + +func buildTrees(start time.Time, end time.Time, limit int32, idx tsdb.Index, ctx context.Context, streamMatchers []*labels.Matcher) ([]*stream_inspector.Tree, error) { + fromUnix := model.TimeFromUnix(start.Unix()) + toUnix := model.TimeFromUnix(end.Unix()) + level.Info(logger).Log("msg", "starting extracting volumes") + level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) + accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32) + err := idx.Volume(ctx, "", fromUnix, toUnix, accumulator, nil, func(meta index.ChunkMeta) bool { + return true + }, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...) + if err != nil { + level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) + return nil, fmt.Errorf("error while fetching all the streams: %w", err) + } + volumes := accumulator.Volumes().GetVolumes() + streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes)) + for _, volume := range volumes { + labelsString := strings.Trim(volume.Name, "{}") + pairs := strings.Split(labelsString, ", ") + lbls := make([]string, 0, len(pairs)*2) + for _, pair := range pairs { + lblVal := strings.Split(pair, "=") + lbls = append(lbls, lblVal[0]) + lbls = append(lbls, strings.Trim(lblVal[1], "\"")) + } + streams = append(streams, stream_inspector.StreamWithVolume{ + Labels: labels.FromStrings(lbls...), + Volume: float64(volume.Volume), + }) + } + + level.Info(logger).Log("msg", "completed extracting volumes") + + level.Info(logger).Log("msg", "starting building trees") + inspector := stream_inspector.Inspector{} + trees, err := inspector.BuildTrees(streams, streamMatchers) + if err != nil { + level.Error(logger).Log("msg", "error while building trees", "err", err) + return nil, fmt.Errorf("error while building trees: %w", err) + } + level.Info(logger).Log("msg", "completed building trees") + return trees, nil +} diff --git a/pkg/stream-inspector/flamegraph-converter.go b/pkg/stream-inspector/flamegraph-converter.go new file mode 100644 index 0000000000000..a1b272bc73f38 --- /dev/null +++ b/pkg/stream-inspector/flamegraph-converter.go @@ -0,0 +1,320 @@ +package stream_inspector + +import "golang.org/x/exp/slices" + +const magicNumber = 0 + +type FlamebearerMode uint8 + +const ( + Diff = iota + Single +) + +type FlamegraphConverter struct { + Left []*Tree + Right []*Tree + Mode FlamebearerMode +} + +func (f *FlamegraphConverter) CovertTrees() FlameGraph { + dictionary := make(map[string]int) + var levels []FlamegraphLevel + //limit := 50000 + //added := 0 + iterator := NewTreesLevelsIterator(f.Left, f.Right, f.Mode) + levelIndex := -1 + for iterator.HasNextLevel() /* && added <= limit*/ { + levelIndex++ + levelIterator := iterator.NextLevelIterator() + var level FlamegraphLevel + for levelIterator.HasNext() { + block := levelIterator.Next() + var blockName string + if block.leftNode != nil { + blockName = block.leftNode.Name + } else { + blockName = block.rightNode.Name + } + + var leftNodeWeight float64 + if block.leftNode != nil { + leftNodeWeight = block.leftNode.Weight + } + var rightNodeWeight float64 + if block.rightNode != nil { + rightNodeWeight = block.rightNode.Weight + } + currentBlockOffset := float64(0) + // we need to find the offset for the first child block to place it exactly under the parent + if levelIndex > 0 && block.childBlockIndex == 0 { + previousLevel := levels[levelIndex-1] + parentsIndexInPreviousLevel := block.parentBlock.indexInLevel + parentsGlobalOffset := previousLevel.blocksGlobalOffsets[parentsIndexInPreviousLevel] + currentBlockOffset = parentsGlobalOffset - level.curentWidth + } + + index, exists := dictionary[blockName] + if !exists { + index = len(dictionary) + dictionary[blockName] = index + } + level.blocksGlobalOffsets = append(level.blocksGlobalOffsets, currentBlockOffset+level.curentWidth) + level.curentWidth += currentBlockOffset + leftNodeWeight + rightNodeWeight + + level.blocks = append(level.blocks, []float64{currentBlockOffset, leftNodeWeight, magicNumber}...) + if f.Mode == Diff { + level.blocks = append(level.blocks, []float64{0, rightNodeWeight, magicNumber}...) + } + level.blocks = append(level.blocks, float64(index)) + //added++ + } + levels = append(levels, level) + } + + firstLevel := levels[0].blocks + totalLeft := float64(0) + totalRight := float64(0) + + blockParamsLength := 4 + if f.Mode == Diff { + blockParamsLength = 7 + } + for i := 1; i < len(firstLevel); i += blockParamsLength { + // leftWidth + totalLeft += firstLevel[i] + if f.Mode == Diff { + //rightWidth + totalRight += firstLevel[i+3] + } + } + names := make([]string, len(dictionary)) + for name, index := range dictionary { + names[index] = name + } + levelsBlocks := make([][]float64, 0, len(levels)) + for _, level := range levels { + levelsBlocks = append(levelsBlocks, level.blocks) + } + var leftTicks *float64 + var rightTicks *float64 + format := "single" + if f.Mode == Diff { + format = "double" + leftTicks = &totalLeft + rightTicks = &totalRight + } + + return FlameGraph{ + Version: 1, + FlameBearer: FlameBearer{ + Units: "bytes", + NumTicks: totalLeft + totalRight, + MaxSelf: totalLeft + totalRight, + Names: names, + Levels: levelsBlocks, + }, + Metadata: Metadata{ + Format: format, + SpyName: "dotnetspy", + SampleRate: 100, + Units: "bytes", + Name: "Logs Volumes", + }, + LeftTicks: leftTicks, + RightTicks: rightTicks, + } + +} + +type Metadata struct { + Format string `json:"format,omitempty"` + SpyName string `json:"spyName,omitempty"` + SampleRate int `json:"sampleRate,omitempty"` + Units string `json:"units,omitempty"` + Name string `json:"name,omitempty"` +} + +type FlameGraph struct { + Version int `json:"version,omitempty"` + FlameBearer FlameBearer `json:"flamebearer"` + Metadata Metadata `json:"metadata"` + LeftTicks *float64 `json:"leftTicks,omitempty"` + RightTicks *float64 `json:"rightTicks,omitempty"` +} + +type FlamegraphLevel struct { + curentWidth float64 + blocks []float64 + blocksGlobalOffsets []float64 +} + +type TreesLevelsIterator struct { + left []*Tree + right []*Tree + currentLevel *LevelBlocksIterator + mode FlamebearerMode +} + +func NewTreesLevelsIterator(left []*Tree, right []*Tree, mode FlamebearerMode) *TreesLevelsIterator { + return &TreesLevelsIterator{left: left, right: right, mode: mode} +} + +func (i *TreesLevelsIterator) HasNextLevel() bool { + if i.currentLevel == nil { + return true + } + + //reset before and after + i.currentLevel.Reset() + defer i.currentLevel.Reset() + + for i.currentLevel.HasNext() { + block := i.currentLevel.Next() + // if at least one block at current level has children + if block.leftNode != nil && len(block.leftNode.Children) > 0 || block.rightNode != nil && len(block.rightNode.Children) > 0 { + return true + } + } + return false +} + +func (i *TreesLevelsIterator) mergeNodes(parent *LevelBlock, left []*Node, right []*Node) []*LevelBlock { + leftByNameMap := i.mapByNodeName(left) + rightByNameMap := i.mapByNodeName(right) + blocks := make([]*LevelBlock, 0, len(leftByNameMap)+len(rightByNameMap)) + for _, leftNode := range leftByNameMap { + rightNode := rightByNameMap[leftNode.Name] + blocks = append(blocks, &LevelBlock{ + parentBlock: parent, + leftNode: leftNode, + rightNode: rightNode, + }) + } + for _, rightNode := range rightByNameMap { + //skip nodes that exists in leftNodes to add only diff from the right nodes + _, exists := leftByNameMap[rightNode.Name] + if exists { + continue + } + + blocks = append(blocks, &LevelBlock{ + parentBlock: parent, + rightNode: rightNode, + }) + } + slices.SortStableFunc(blocks, func(a, b *LevelBlock) bool { + return maxWeight(a.leftNode, a.rightNode) > maxWeight(b.leftNode, b.rightNode) + }) + return blocks +} + +func maxWeight(left *Node, right *Node) float64 { + if left == nil { + return right.Weight + } + if right == nil { + return left.Weight + } + if left.Weight > right.Weight { + return left.Weight + } + return right.Weight +} + +func (i *TreesLevelsIterator) NextLevelIterator() *LevelBlocksIterator { + if i.currentLevel == nil { + leftNodes := make([]*Node, 0, len(i.left)) + for _, tree := range i.left { + leftNodes = append(leftNodes, tree.Root) + } + rightNodes := make([]*Node, 0, len(i.right)) + for _, tree := range i.right { + rightNodes = append(rightNodes, tree.Root) + } + levelBlocks := i.mergeNodes(nil, leftNodes, rightNodes) + + for index, block := range levelBlocks { + if index > 0 { + block.leftNeighbour = levelBlocks[index-1] + } + block.childBlockIndex = index + block.indexInLevel = index + } + i.currentLevel = NewLevelBlocksIterator(levelBlocks) + return i.currentLevel + } + + var nextLevelBlocks []*LevelBlock + for i.currentLevel.HasNext() { + currentBlock := i.currentLevel.Next() + var left []*Node + if currentBlock.leftNode != nil { + left = currentBlock.leftNode.Children + } + var right []*Node + if currentBlock.rightNode != nil { + right = currentBlock.rightNode.Children + } + levelBlocks := i.mergeNodes(currentBlock, left, right) + for index, block := range levelBlocks { + block.childBlockIndex = index + block.indexInLevel = len(nextLevelBlocks) + if len(nextLevelBlocks) > 0 { + block.leftNeighbour = nextLevelBlocks[len(nextLevelBlocks)-1] + } + nextLevelBlocks = append(nextLevelBlocks, block) + } + } + i.currentLevel = NewLevelBlocksIterator(nextLevelBlocks) + return i.currentLevel +} + +func (i *TreesLevelsIterator) mapByNodeName(nodes []*Node) map[string]*Node { + result := make(map[string]*Node, len(nodes)) + for _, node := range nodes { + result[node.Name] = node + } + return result +} + +type LevelBlock struct { + parentBlock *LevelBlock + leftNeighbour *LevelBlock + childBlockIndex int + leftNode *Node + rightNode *Node + indexInLevel int +} + +// iterates over Nodes at the level +type LevelBlocksIterator struct { + blocks []*LevelBlock + index int +} + +func NewLevelBlocksIterator(blocks []*LevelBlock) *LevelBlocksIterator { + return &LevelBlocksIterator{blocks: blocks, index: 0} +} + +func (i *LevelBlocksIterator) HasNext() bool { + return i.index < len(i.blocks) +} + +func (i *LevelBlocksIterator) Reset() { + i.index = 0 +} + +func (i *LevelBlocksIterator) Next() *LevelBlock { + next := i.blocks[i.index] + i.index++ + return next +} + +type FlameBearer struct { + Units string `json:"units,omitempty"` + NumTicks float64 `json:"numTicks" json:"num_ticks,omitempty"` + MaxSelf float64 `json:"maxSelf" json:"max_self,omitempty"` + Names []string `json:"names,omitempty" json:"names,omitempty" json:"names,omitempty"` + Levels [][]float64 `json:"levels,omitempty" json:"levels,omitempty" json:"levels,omitempty"` +} diff --git a/pkg/stream-inspector/flamegraph-converter_test.go b/pkg/stream-inspector/flamegraph-converter_test.go new file mode 100644 index 0000000000000..eb71f7b25b630 --- /dev/null +++ b/pkg/stream-inspector/flamegraph-converter_test.go @@ -0,0 +1,167 @@ +package stream_inspector + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +const magic = 0 + +func TestFlamegraphConverter_covertTrees(t *testing.T) { + tests := []struct { + name string + left []*Tree + right []*Tree + mode FlamebearerMode + want FlameBearer + }{ + { + name: "expected flame graph to be built with offset for the second level", + mode: Single, + left: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a", Weight: 50, Children: []*Node{ + {Name: "third_level_a", Weight: 20, Children: []*Node{ + {Name: "fourth_level_a_0", Weight: 10}, + {Name: "fourth_level_a_1", Weight: 5}, + }}}, + }, + }, + }, + }, + // 2nd tree + { + Root: &Node{ + Name: "top_level-b", Weight: 50, Children: []*Node{ + {Name: "second_level-b", Weight: 50, Children: []*Node{ + {Name: "third_level_b", Weight: 10, Children: []*Node{ + {Name: "fourth_level_b", Weight: 5, Children: []*Node{ + {Name: "fives_level_b", Weight: 2, Children: []*Node{ + {Name: "sixth_level_b", Weight: 1}, + }}, + }}, + }}}, + }, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 150, + MaxSelf: 150, + Names: []string{ + "top_level-a", "top_level-b", + "second_level-a", "second_level-b", + "third_level_a", "third_level_b", + "fourth_level_a_0", "fourth_level_a_1", "fourth_level_b", + "fives_level_b", + "sixth_level_b", + }, + Levels: [][]float64{ + // for each block: start_offset, end_offset, unknown_yet, index from Names slice + // 1st level + { /*1st block*/ 0, 100, magic, 0 /*2nd block*/, 0, 50, magic, 1}, + // 2nd level + { /*1st block*/ 0, 50, magic, 2 /*2nd block*/, 50, 50, magic, 3}, + // 3rd level + { /*1st block*/ 0, 20, magic, 4 /*2nd block*/, 80, 10, magic, 5}, + // 4th level + { /*1st block*/ 0, 10, magic, 6 /*2nd block*/, 0, 5, magic, 7 /*3rd block*/, 85, 5, magic, 8}, + // 5s level + { /*1st block*/ 100, 2, magic, 9}, + // 6th level + { /*1st block*/ 100, 1, magic, 10}, + }, + }, + }, + { + name: "expected flame graph to be built", + mode: Single, + left: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a", Weight: 100}, + }, + }, + }, + // 2nd tree + { + Root: &Node{ + Name: "top_level-b", Weight: 50, Children: []*Node{ + {Name: "second_level-b", Weight: 50}, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 150, + MaxSelf: 150, + Names: []string{"top_level-a", "top_level-b", "second_level-a", "second_level-b"}, + Levels: [][]float64{ + // for each block: start_offset, end_offset, unknown_yet, index from Names slice + // 1st level + { /*1st block*/ 0, 100, magic, 0 /*2nd block*/, 0, 50, magic, 1}, + // 2nd level + { /*1st block*/ 0, 100, magic, 2 /*2nd block*/, 0, 50, magic, 3}, + }, + }, + }, + + { + name: "expected diff flame graph to be built", + mode: Diff, + left: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a_1", Weight: 100}, + }, + }, + }, + }, + right: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 200, Children: []*Node{ + {Name: "second_level-a_1", Weight: 50}, + {Name: "second_level-a_2", Weight: 100}, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 300, + MaxSelf: 300, + Names: []string{"top_level-a", "second_level-a_1", "second_level-a_2"}, + Levels: [][]float64{ + // for each block: offset, value, self, offset_right, value_right, self_right, label, + // 1st level + { /*1st block*/ /*left*/ 0, 100, magic /*right*/, 0, 200, magic, 0}, + // 2nd level + { /*1st block*/ /*left*/ 0, 100, magic /*right*/, 0, 50, magic, 1 /*2nd block*/ /*left*/, 0, 0, magic /*right*/, 0, 100, magic, 2}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := &FlamegraphConverter{ + Left: tt.left, + Right: tt.right, + Mode: tt.mode, + } + result := f.CovertTrees() + require.Equal(t, tt.want, result.FlameBearer) + }) + } +} diff --git a/pkg/stream-inspector/inspector.go b/pkg/stream-inspector/inspector.go new file mode 100644 index 0000000000000..df73d37fa5e8c --- /dev/null +++ b/pkg/stream-inspector/inspector.go @@ -0,0 +1,154 @@ +package stream_inspector + +import ( + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/exp/slices" + "math" + "sort" +) + +type Inspector struct { +} + +type StreamWithVolume struct { + Labels labels.Labels + Volume float64 +} + +func (i *Inspector) BuildTrees(streams []StreamWithVolume, matchers []*labels.Matcher) ([]*Tree, error) { + labelNamesOrder := i.sortLabelNamesByPopularity(streams, matchers) + + rootLabelNameToThreeMap := make(map[string]*Tree) + var threes []*Tree + + for _, stream := range streams { + streamLabels := make(labels.Labels, 0, len(stream.Labels)) + for _, label := range stream.Labels { + if _, skip := labelsToSkip[label.Name]; skip { + continue + } + streamLabels = append(streamLabels, label) + } + + slices.SortStableFunc(streamLabels, func(a, b labels.Label) bool { + return labelNamesOrder[a.Name] < labelNamesOrder[b.Name] + }) + rootLabelName := streamLabels[0].Name + rootThree, exists := rootLabelNameToThreeMap[rootLabelName] + if !exists { + rootThree = &Tree{Root: &Node{Name: rootLabelName, ChildNameToIndex: make(map[string]int)}} + rootLabelNameToThreeMap[rootLabelName] = rootThree + threes = append(threes, rootThree) + } + streamVolume := stream.Volume + currentNode := rootThree.Root + for i, label := range streamLabels { + var labelNameNode *Node + if i == 0 { + labelNameNode = currentNode + } else { + labelNameNode = currentNode.FindOrCreateChild(label.Name) + } + labelNameNode.Weight += streamVolume + + labelValueNode := labelNameNode.FindOrCreateChild(label.Value) + labelValueNode.Weight += streamVolume + if i < len(streamLabels)-1 { + currentNode = labelValueNode + } + } + + } + return threes, nil +} + +type Iterator[T any] struct { + values []T + position int +} + +func NewIterator[T any](values []T) *Iterator[T] { + return &Iterator[T]{values: values} +} + +func (i *Iterator[T]) HasNext() bool { + return i.position < len(i.values)-1 +} + +func (i *Iterator[T]) Next() T { + next := i.values[i.position] + i.position++ + return next +} + +func (i *Iterator[T]) Reset() { + i.position = 0 +} + +var labelsToSkip = map[string]any{ + "__stream_shard__": nil, +} + +func (i *Inspector) sortLabelNamesByPopularity(streams []StreamWithVolume, matchers []*labels.Matcher) map[string]int { + labelNameCounts := make(map[string]uint32) + uniqueLabelNames := make([]string, 0, 1000) + for _, stream := range streams { + for _, label := range stream.Labels { + if _, skip := labelsToSkip[label.Name]; skip { + continue + } + count := labelNameCounts[label.Name] + if count == 0 { + uniqueLabelNames = append(uniqueLabelNames, label.Name) + } + labelNameCounts[label.Name] = count + 1 + } + } + matchersLabels := make(map[string]int, len(matchers)) + for idx, matcher := range matchers { + matchersLabels[matcher.Name] = idx + } + sort.SliceStable(uniqueLabelNames, func(i, j int) bool { + leftLabel := uniqueLabelNames[i] + rightLabel := uniqueLabelNames[j] + leftLabelPriority := -1 + if leftLabelIndex, used := matchersLabels[leftLabel]; used { + leftLabelPriority = math.MaxInt - leftLabelIndex + } + rightLabelPriority := -1 + if rightLabelIndex, used := matchersLabels[rightLabel]; used { + rightLabelPriority = math.MaxInt - rightLabelIndex + } + leftCount := labelNameCounts[leftLabel] + rightCount := labelNameCounts[rightLabel] + // sort by streams count (desc) and by label name (asc) + + return leftLabelPriority > rightLabelPriority || (leftLabelPriority == rightLabelPriority && leftCount > rightCount) || leftCount == rightCount && leftLabel < rightLabel + }) + labelNameToOrderMap := make(map[string]int, len(uniqueLabelNames)) + for idx, name := range uniqueLabelNames { + labelNameToOrderMap[name] = idx + } + return labelNameToOrderMap +} + +type Tree struct { + Root *Node `json:"root"` +} + +type Node struct { + Name string `json:"name,omitempty"` + Weight float64 `json:"weight,omitempty"` + ChildNameToIndex map[string]int `json:"-"` + Children []*Node `json:"children,omitempty"` +} + +func (n *Node) FindOrCreateChild(childName string) *Node { + childIndex, exists := n.ChildNameToIndex[childName] + if !exists { + n.Children = append(n.Children, &Node{Name: childName, ChildNameToIndex: make(map[string]int)}) + childIndex = len(n.Children) - 1 + n.ChildNameToIndex[childName] = childIndex + } + return n.Children[childIndex] +} diff --git a/pkg/stream-inspector/inspector_test.go b/pkg/stream-inspector/inspector_test.go new file mode 100644 index 0000000000000..9a32947a54eac --- /dev/null +++ b/pkg/stream-inspector/inspector_test.go @@ -0,0 +1,146 @@ +package stream_inspector + +import ( + "encoding/json" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "reflect" + "testing" +) + +func Test_Inspector(t *testing.T) { + tests := map[string]struct { + streams []StreamWithVolume + expectedResultJSON string + }{ + "expected 2 threes": { + streams: []StreamWithVolume{ + makeStream("cl", "cluster-a", "ns", "loki-ops"), + makeStream("cl", "cluster-a", "ns", "loki-dev"), + makeStream("cl", "cluster-a", "ns", "loki-dev", "level", "error"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), + }, + expectedResultJSON: `[ + { + "root": { + "name": "cl", + "weight": 3, + "children": [ + { + "name": "cluster-a", + "weight": 3, + "children": [ + { + "name": "ns", + "weight": 3, + "children": [ + { + "name": "loki-ops", + "weight": 1 + }, + { + "name": "loki-dev", + "weight": 2, + "children": [ + { + "name": "level", + "weight": 1, + "children": [ + { + "name": "error", + "weight": 1 + } + ] + } + ] + } + ] + } + ] + } + ] + } + }, + { + "root": { + "name": "stack-cl", + "weight": 3, + "children": [ + { + "name": "cluster-b", + "weight": 3, + "children": [ + { + "name": "stack-ns", + "weight": 3, + "children": [ + { + "name": "loki-dev", + "weight": 1 + }, + { + "name": "loki-ops", + "weight": 1 + }, + { + "name": "loki-prod", + "weight": 1 + } + ] + } + ] + } + ] + } + } +] +`, + }, + } + for name, testData := range tests { + t.Run(name, func(t *testing.T) { + inspector := Inspector{} + forest, err := inspector.BuildTrees(testData.streams, nil) + require.NoError(t, err) + actualJson, err := json.Marshal(forest) + require.NoError(t, err) + require.JSONEq(t, testData.expectedResultJSON, string(actualJson)) + }) + } +} + +func Test_Inspector_sortLabelNames(t *testing.T) { + inspector := Inspector{} + result := inspector.sortLabelNamesByPopularity([]StreamWithVolume{ + makeStream("cl", "cluster-a", "ns", "loki-ops"), + makeStream("cl", "cluster-a", "ns", "loki-dev"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), + }, nil) + require.Equal(t, map[string]int{ + "cl": 2, "ns": 3, "stack-cl": 0, "stack-ns": 1, + }, result, + "must be sorted by streams count in descending order and after this by label name in ascending order") +} + +func makeStream(labelValues ...string) StreamWithVolume { + var createdLabels labels.Labels + for i := 0; i < len(labelValues); i += 2 { + createdLabels = append(createdLabels, labels.Label{ + Name: labelValues[i], + Value: labelValues[i+1], + }) + } + return StreamWithVolume{ + Labels: createdLabels, + Volume: 1, + } +} + +// compareThree function to compare provided threes to check if they are equal +func compareThree(a, b Tree) bool { + return reflect.DeepEqual(a, b) +}