From d193fc0a6f4370f0a90381fc4029ace95635ae29 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 21 Dec 2023 15:40:00 +0200 Subject: [PATCH 1/4] implemented logic to display stream volumes as a flamegraph Signed-off-by: Vladyslav Diachenko --- cmd/streams-inspect/streams-inspector.go | 127 ++++++------ pkg/stream-inspector/flamegraph-converter.go | 181 ++++++++++++++++++ .../flamegraph-converter_test.go | 120 ++++++++++++ pkg/stream-inspector/inspector.go | 149 ++++++++++++++ pkg/stream-inspector/inspector_test.go | 143 ++++++++++++++ 5 files changed, 651 insertions(+), 69 deletions(-) create mode 100644 pkg/stream-inspector/flamegraph-converter.go create mode 100644 pkg/stream-inspector/flamegraph-converter_test.go create mode 100644 pkg/stream-inspector/inspector.go create mode 100644 pkg/stream-inspector/inspector_test.go diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go index 8a65e307477ae..226e6e5bf164d 100644 --- a/cmd/streams-inspect/streams-inspector.go +++ b/cmd/streams-inspect/streams-inspector.go @@ -3,14 +3,13 @@ package main import ( "context" "encoding/json" + "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" + stream_inspector "github.com/grafana/loki/pkg/stream-inspector" + "golang.org/x/exp/slices" + "math" "os" "time" - "github.com/grafana/dskit/concurrency" - "github.com/pkg/errors" - - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" "github.com/prometheus/prometheus/model/labels" @@ -34,7 +33,7 @@ var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) func main() { ctx := context.Background() - idx, _, err := tsdb.NewTSDBIndexFromFile(indexLocation) + idx, _, err := tsdb.NewTSDBIndexFromFile(indexLocation, tsdb.IndexOpts{}) if err != nil { level.Error(logger).Log("msg", "can not read index file", "err", err) @@ -45,88 +44,78 @@ func main() { fromUnix := model.TimeFromUnix(start.Unix()) toUnix := model.TimeFromUnix(end.Unix()) level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) - streams, err := idx.Series(ctx, "", fromUnix, toUnix, nil, nil, labels.MustNewMatcher(labels.MatchNotEqual, "job", "non-existent")) + //idx.Stats(ctx, "", 0, model.Time(math.MaxInt), ) + level.Info(logger).Log("msg", "starting extracting series") + streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, labels.MustNewMatcher(labels.MatchNotEqual, "job", "non-existent")) if err != nil { level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) return } + level.Info(logger).Log("msg", "completed extracting series") - streamsStats := make([]StreamStats, len(streams)) - indexStats := IndexStats{Streams: streamsStats} - err = concurrency.ForEachJob(ctx, len(streams), 100, func(ctx context.Context, jobIdx int) error { - s := streams[jobIdx] - currentStreamMatchers := make([]*labels.Matcher, 0, len(s.Labels)) - for _, label := range s.Labels { - currentStreamMatchers = append(currentStreamMatchers, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)) - } - acc := StreamStats{Stream: s.Labels.String()} - err = idx.Stats(ctx, "", fromUnix, toUnix, &acc, nil, func(meta index.ChunkMeta) bool { - return true - }, currentStreamMatchers...) - if err != nil { - level.Error(logger).Log("msg", "error while collecting stats for the stream", "stream", s.Labels.String(), "err", err) - return errors.New("error while collecting stats for the stream: " + s.Labels.String()) + level.Info(logger).Log("msg", "starting extracting volumes") + streamStringToFingerprint := make(map[string]model.Fingerprint, len(streams)) + for _, stream := range streams { + streamStringToFingerprint[stream.Labels.String()] = stream.Fingerprint + } + accumulator := seriesvolume.NewAccumulator(int32(len(streams)), len(streams)) + err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool { + return true + }, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent")) + if err != nil { + level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) + return + } + volumes := accumulator.Volumes().GetVolumes() + + streamToVolume := make(map[model.Fingerprint]float64, len(volumes)) + for _, volume := range volumes { + fingerprint, exists := streamStringToFingerprint[volume.Name] + if !exists { + level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name) + return } + streamToVolume[fingerprint] = float64(volume.Volume) + } + level.Info(logger).Log("msg", "completed extracting volumes") - streamsStats[jobIdx] = acc - return nil + slices.SortStableFunc(streams, func(a, b tsdb.Series) bool { + return streamToVolume[a.Fingerprint] > streamToVolume[b.Fingerprint] }) - indexStats.StreamsCount = uint32(len(streamsStats)) - for _, stat := range streamsStats { - indexStats.SizeKB += stat.SizeKB - indexStats.ChunksCount += stat.ChunksCount - } + + streams = streams[:5000] + level.Info(logger).Log("msg", "starting building trees") + inspector := stream_inspector.Inspector{} + trees, err := inspector.BuildTrees(streams, streamToVolume) if err != nil { - level.Error(logger).Log("msg", "error while processing streams concurrently", "err", err) + level.Error(logger).Log("msg", "error while building trees", "err", err) return } + level.Info(logger).Log("msg", "completed building trees") + + level.Info(logger).Log("msg", "starting building flamegraph model") + converter := stream_inspector.FlamegraphConverter{} + flameBearer := converter.CovertTrees(trees) + level.Info(logger).Log("msg", "completed building flamegraph model") - content, err := json.MarshalIndent(indexStats, " ", " ") + level.Info(logger).Log("msg", "starting writing json") + content, err := json.Marshal(flameBearer) if err != nil { panic(err) return } + _, err = os.Stat(resultFilePath) + if err == nil { + level.Info(logger).Log("msg", "results file already exists. deleting previous one.") + err := os.Remove(resultFilePath) + if err != nil { + panic(err) + return + } + } err = os.WriteFile(resultFilePath, content, 0644) if err != nil { panic(err) return } } - -type ChunkStats struct { - Start time.Time - End time.Time - SizeKB uint32 -} - -type StreamStats struct { - Stream string - Chunks []ChunkStats - ChunksCount uint32 - SizeKB uint32 -} - -type IndexStats struct { - Streams []StreamStats - StreamsCount uint32 - ChunksCount uint32 - SizeKB uint32 -} - -func (i *StreamStats) AddStream(_ model.Fingerprint) { - -} - -func (i *StreamStats) AddChunk(_ model.Fingerprint, chk index.ChunkMeta) { - i.Chunks = append(i.Chunks, ChunkStats{ - Start: time.UnixMilli(chk.MinTime).UTC(), - End: time.UnixMilli(chk.MaxTime).UTC(), - SizeKB: chk.KB, - }) - i.ChunksCount++ - i.SizeKB += chk.KB -} - -func (i StreamStats) Stats() stats.Stats { - return logproto.IndexStatsResponse{} -} diff --git a/pkg/stream-inspector/flamegraph-converter.go b/pkg/stream-inspector/flamegraph-converter.go new file mode 100644 index 0000000000000..484bee3ae2f73 --- /dev/null +++ b/pkg/stream-inspector/flamegraph-converter.go @@ -0,0 +1,181 @@ +package stream_inspector + +import "golang.org/x/exp/slices" + +const magicNumber = 0 + +type FlamegraphConverter struct { +} + +func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { + dictionary := make(map[string]int) + var levels [][]float64 + + iterator := NewTreesLevelsIterator(trees) + levelIndex := -1 + for iterator.HasNextLevel() { + levelIndex++ + levelIterator := iterator.NextLevelIterator() + var level []float64 + for levelIterator.HasNext() { + block := levelIterator.Next() + blockName := block.node.Name + blockWeight := block.node.Weight + currentBlockOffset := float64(0) + // we need to find the offset for the first child block to place it exactly under the parent + if levelIndex > 0 && block.childIndex == 0 { + previousLevel := levels[levelIndex-1] + parentsIndexInPreviousLevel := block.parentBlock.indexInLevel + + // calculate the offset of the parent from the left side(offsets+weight of all left neighbours + parents offset) + parentsOffset := previousLevel[parentsIndexInPreviousLevel*4] + parentsGlobalOffset := parentsOffset + parentsLeftNeighbour := block.parentBlock.leftNeighbour + for parentsLeftNeighbour != nil { + leftNeighbourOffset := previousLevel[parentsLeftNeighbour.indexInLevel*4] + leftNeighbourWeight := previousLevel[parentsLeftNeighbour.indexInLevel*4+1] + parentsGlobalOffset += leftNeighbourOffset + leftNeighbourWeight + parentsLeftNeighbour = parentsLeftNeighbour.leftNeighbour + } + + currentBlockOffset = parentsGlobalOffset + left := block.leftNeighbour + // iterate over all already added blocks from the left + for left != nil { + leftNeighbourOffset := level[left.indexInLevel*4] + leftNeighbourWeight := level[left.indexInLevel*4+1] + currentBlockOffset -= leftNeighbourOffset + leftNeighbourWeight + left = left.leftNeighbour + } + } + + index, exists := dictionary[blockName] + if !exists { + index = len(dictionary) + dictionary[blockName] = index + } + + level = append(level, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...) + } + levels = append(levels, level) + } + firstLevel := levels[0] + totalWidth := float64(0) + for i := 1; i < len(firstLevel); i += 4 { + totalWidth += firstLevel[i] + } + names := make([]string, len(dictionary)) + for name, index := range dictionary { + names[index] = name + } + return FlameBearer{ + Units: "bytes", + NumTicks: totalWidth, + MaxSelf: totalWidth, + Names: names, + Levels: levels, + } +} + +type TreesLevelsIterator struct { + trees []*Tree + currentLevel *LevelBlocksIterator +} + +func NewTreesLevelsIterator(trees []*Tree) *TreesLevelsIterator { + return &TreesLevelsIterator{trees: trees} +} + +func (i *TreesLevelsIterator) HasNextLevel() bool { + if i.currentLevel == nil { + return true + } + + //reset before and after + i.currentLevel.Reset() + defer i.currentLevel.Reset() + + for i.currentLevel.HasNext() { + block := i.currentLevel.Next() + // if at least one block at current level has children + if len(block.node.Children) > 0 { + return true + } + } + return false +} + +func (i *TreesLevelsIterator) NextLevelIterator() *LevelBlocksIterator { + if i.currentLevel == nil { + levelNodes := make([]*LevelBlock, 0, len(i.trees)) + for index, tree := range i.trees { + var leftNeighbour *LevelBlock + if index > 0 { + leftNeighbour = levelNodes[index-1] + } + levelNodes = append(levelNodes, &LevelBlock{leftNeighbour: leftNeighbour, node: tree.Root, childIndex: index, indexInLevel: index}) + } + slices.SortStableFunc(levelNodes, func(a, b *LevelBlock) bool { + return a.node.Weight > b.node.Weight + }) + i.currentLevel = NewLevelBlocksIterator(levelNodes) + return i.currentLevel + } + + var nextLevelBlocks []*LevelBlock + for i.currentLevel.HasNext() { + block := i.currentLevel.Next() + slices.SortStableFunc(block.node.Children, func(a, b *Node) bool { + return a.Weight > b.Weight + }) + for index, child := range block.node.Children { + var leftNeighbour *LevelBlock + if len(nextLevelBlocks) > 0 { + leftNeighbour = nextLevelBlocks[len(nextLevelBlocks)-1] + } + nextLevelBlocks = append(nextLevelBlocks, &LevelBlock{leftNeighbour: leftNeighbour, childIndex: index, indexInLevel: len(nextLevelBlocks), node: child, parentBlock: block}) + } + } + i.currentLevel = NewLevelBlocksIterator(nextLevelBlocks) + return i.currentLevel +} + +type LevelBlock struct { + parentBlock *LevelBlock + leftNeighbour *LevelBlock + childIndex int + node *Node + indexInLevel int +} + +// iterates over Nodes at the level +type LevelBlocksIterator struct { + blocks []*LevelBlock + index int +} + +func NewLevelBlocksIterator(blocks []*LevelBlock) *LevelBlocksIterator { + return &LevelBlocksIterator{blocks: blocks, index: 0} +} + +func (i *LevelBlocksIterator) HasNext() bool { + return i.index < len(i.blocks) +} + +func (i *LevelBlocksIterator) Reset() { + i.index = 0 +} + +func (i *LevelBlocksIterator) Next() *LevelBlock { + next := i.blocks[i.index] + i.index++ + return next +} + +type FlameBearer struct { + Units string `json:"units,omitempty"` + NumTicks float64 `json:"numTicks" json:"num_ticks,omitempty"` + MaxSelf float64 `json:"maxSelf" json:"max_self,omitempty"` + Names []string `json:"names,omitempty" json:"names,omitempty" json:"names,omitempty"` + Levels [][]float64 `json:"levels,omitempty" json:"levels,omitempty" json:"levels,omitempty"` +} diff --git a/pkg/stream-inspector/flamegraph-converter_test.go b/pkg/stream-inspector/flamegraph-converter_test.go new file mode 100644 index 0000000000000..b6ae0327ee262 --- /dev/null +++ b/pkg/stream-inspector/flamegraph-converter_test.go @@ -0,0 +1,120 @@ +package stream_inspector + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +const magic = 0 + +func TestFlamegraphConverter_covertTrees(t *testing.T) { + tests := []struct { + name string + trees []*Tree + want FlameBearer + }{ + { + name: "expected flame graph to be built with offset for the second level", + trees: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a", Weight: 50, Children: []*Node{ + {Name: "third_level_a", Weight: 20, Children: []*Node{ + {Name: "fourth_level_a_0", Weight: 10}, + {Name: "fourth_level_a_1", Weight: 5}, + }}}, + }, + }, + }, + }, + // 2nd tree + { + Root: &Node{ + Name: "top_level-b", Weight: 50, Children: []*Node{ + {Name: "second_level-b", Weight: 50, Children: []*Node{ + {Name: "third_level_b", Weight: 10, Children: []*Node{ + {Name: "fourth_level_b", Weight: 5, Children: []*Node{ + {Name: "fives_level_b", Weight: 2, Children: []*Node{ + {Name: "sixth_level_b", Weight: 1}, + }}, + }}, + }}}, + }, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 150, + MaxSelf: 150, + Names: []string{ + "top_level-a", "top_level-b", + "second_level-a", "second_level-b", + "third_level_a", "third_level_b", + "fourth_level_a_0", "fourth_level_a_1", "fourth_level_b", + "fives_level_b", + "sixth_level_b", + }, + Levels: [][]float64{ + // for each block: start_offset, end_offset, unknown_yet, index from Names slice + // 1st level + { /*1st block*/ 0, 100, magic, 0 /*2nd block*/, 0, 50, magic, 1}, + // 2nd level + { /*1st block*/ 0, 50, magic, 2 /*2nd block*/, 50, 50, magic, 3}, + // 3rd level + { /*1st block*/ 0, 20, magic, 4 /*2nd block*/, 80, 10, magic, 5}, + // 4th level + { /*1st block*/ 0, 10, magic, 6 /*2nd block*/, 0, 5, magic, 7 /*3rd block*/, 85, 5, magic, 8}, + // 5s level + { /*1st block*/ 100, 2, magic, 9}, + // 6th level + { /*1st block*/ 100, 1, magic, 10}, + }, + }, + }, + { + name: "expected flame graph to be built", + trees: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a", Weight: 100}, + }, + }, + }, + // 2nd tree + { + Root: &Node{ + Name: "top_level-b", Weight: 50, Children: []*Node{ + {Name: "second_level-b", Weight: 50}, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 150, + MaxSelf: 150, + Names: []string{"top_level-a", "top_level-b", "second_level-a", "second_level-b"}, + Levels: [][]float64{ + // for each block: start_offset, end_offset, unknown_yet, index from Names slice + // 1st level + { /*1st block*/ 0, 100, magic, 0 /*2nd block*/, 0, 50, magic, 1}, + // 2nd level + { /*1st block*/ 0, 100, magic, 2 /*2nd block*/, 0, 50, magic, 3}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := &FlamegraphConverter{} + result := f.CovertTrees(tt.trees) + require.Equal(t, tt.want, result) + }) + } +} diff --git a/pkg/stream-inspector/inspector.go b/pkg/stream-inspector/inspector.go new file mode 100644 index 0000000000000..042b4b912e237 --- /dev/null +++ b/pkg/stream-inspector/inspector.go @@ -0,0 +1,149 @@ +package stream_inspector + +import ( + "github.com/grafana/loki/pkg/storage/stores/tsdb" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/exp/slices" + "sort" +) + +type Inspector struct { +} + +func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64) ([]*Tree, error) { + labelNamesOrder := i.sortLabelNamesByPopularity(streams) + + rootLabelNameToThreeMap := make(map[string]*Tree) + var threes []*Tree + + for _, stream := range streams { + streamLabels := make(labels.Labels, 0, len(stream.Labels)) + for _, label := range stream.Labels { + if _, skip := labelsToSkip[label.Name]; skip { + continue + } + streamLabels = append(streamLabels, label) + } + + slices.SortStableFunc(streamLabels, func(a, b labels.Label) bool { + return labelNamesOrder[a.Name] < labelNamesOrder[b.Name] + }) + rootLabelName := streamLabels[0].Name + rootThree, exists := rootLabelNameToThreeMap[rootLabelName] + if !exists { + rootThree = &Tree{Root: &Node{Name: rootLabelName, ChildNameToIndex: make(map[string]int)}} + rootLabelNameToThreeMap[rootLabelName] = rootThree + threes = append(threes, rootThree) + } + streamVolume, err := i.getStreamVolume(stream, volumesMap) + if err != nil { + return nil, errors.Wrap(err, "can not build tree due to error") + } + currentNode := rootThree.Root + for i, label := range streamLabels { + var labelNameNode *Node + if i == 0 { + labelNameNode = currentNode + } else { + labelNameNode = currentNode.FindOrCreateChild(label.Name) + } + labelNameNode.Weight += streamVolume + + labelValueNode := labelNameNode.FindOrCreateChild(label.Value) + labelValueNode.Weight += streamVolume + if i < len(streamLabels)-1 { + currentNode = labelValueNode + } + } + + } + return threes, nil +} + +func (i *Inspector) getStreamVolume(stream tsdb.Series, volumesMap map[model.Fingerprint]float64) (float64, error) { + streamVolume, exists := volumesMap[stream.Fingerprint] + if !exists { + return 0, errors.New("stream volume not found") + } + return streamVolume, nil +} + +type Iterator[T any] struct { + values []T + position int +} + +func NewIterator[T any](values []T) *Iterator[T] { + return &Iterator[T]{values: values} +} + +func (i *Iterator[T]) HasNext() bool { + return i.position < len(i.values)-1 +} + +func (i *Iterator[T]) Next() T { + next := i.values[i.position] + i.position++ + return next +} + +func (i *Iterator[T]) Reset() { + i.position = 0 +} + +var labelsToSkip = map[string]any{ + "__stream_shard__": nil, +} + +func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series) map[string]int { + labelNameCounts := make(map[string]uint32) + uniqueLabelNames := make([]string, 0, 1000) + for _, stream := range streams { + for _, label := range stream.Labels { + if _, skip := labelsToSkip[label.Name]; skip { + continue + } + count := labelNameCounts[label.Name] + if count == 0 { + uniqueLabelNames = append(uniqueLabelNames, label.Name) + } + labelNameCounts[label.Name] = count + 1 + } + } + sort.SliceStable(uniqueLabelNames, func(i, j int) bool { + leftLabel := uniqueLabelNames[i] + rightLabel := uniqueLabelNames[j] + leftCount := labelNameCounts[leftLabel] + rightCount := labelNameCounts[rightLabel] + // sort by streams count (desc) and by label name (asc) + return leftCount > rightCount || leftCount == rightCount && leftLabel < rightLabel + }) + labelNameToOrderMap := make(map[string]int, len(uniqueLabelNames)) + for idx, name := range uniqueLabelNames { + labelNameToOrderMap[name] = idx + } + return labelNameToOrderMap +} + +type Tree struct { + Root *Node `json:"root"` +} + +type Node struct { + Name string `json:"name,omitempty"` + Weight float64 `json:"weight,omitempty"` + ChildNameToIndex map[string]int `json:"-"` + Children []*Node `json:"children,omitempty"` +} + +func (n *Node) FindOrCreateChild(childName string) *Node { + childIndex, exists := n.ChildNameToIndex[childName] + if !exists { + n.Children = append(n.Children, &Node{Name: childName, ChildNameToIndex: make(map[string]int)}) + childIndex = len(n.Children) - 1 + n.ChildNameToIndex[childName] = childIndex + } + return n.Children[childIndex] +} diff --git a/pkg/stream-inspector/inspector_test.go b/pkg/stream-inspector/inspector_test.go new file mode 100644 index 0000000000000..9c3cb1c199f35 --- /dev/null +++ b/pkg/stream-inspector/inspector_test.go @@ -0,0 +1,143 @@ +package stream_inspector + +import ( + "encoding/json" + "github.com/grafana/loki/pkg/storage/stores/tsdb" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "reflect" + "testing" +) + +func Test_Inspector(t *testing.T) { + tests := map[string]struct { + streams []tsdb.Series + expectedResultJSON string + }{ + "expected 2 threes": { + streams: []tsdb.Series{ + makeStream("cl", "cluster-a", "ns", "loki-ops"), + makeStream("cl", "cluster-a", "ns", "loki-dev"), + makeStream("cl", "cluster-a", "ns", "loki-dev", "level", "error"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), + }, + expectedResultJSON: `[ + { + "root": { + "name": "cl", + "weight": 3, + "children": [ + { + "name": "cluster-a", + "weight": 3, + "children": [ + { + "name": "ns", + "weight": 3, + "children": [ + { + "name": "loki-ops", + "weight": 1 + }, + { + "name": "loki-dev", + "weight": 2, + "children": [ + { + "name": "level", + "weight": 1, + "children": [ + { + "name": "error", + "weight": 1 + } + ] + } + ] + } + ] + } + ] + } + ] + } + }, + { + "root": { + "name": "stack-cl", + "weight": 3, + "children": [ + { + "name": "cluster-b", + "weight": 3, + "children": [ + { + "name": "stack-ns", + "weight": 3, + "children": [ + { + "name": "loki-dev", + "weight": 1 + }, + { + "name": "loki-ops", + "weight": 1 + }, + { + "name": "loki-prod", + "weight": 1 + } + ] + } + ] + } + ] + } + } +] +`, + }, + } + for name, testData := range tests { + t.Run(name, func(t *testing.T) { + inspector := Inspector{} + forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1}) + require.NoError(t, err) + actualJson, err := json.Marshal(forest) + require.NoError(t, err) + require.JSONEq(t, testData.expectedResultJSON, string(actualJson)) + }) + } +} + +func Test_Inspector_sortLabelNames(t *testing.T) { + inspector := Inspector{} + result := inspector.sortLabelNamesByPopularity([]tsdb.Series{ + makeStream("cl", "cluster-a", "ns", "loki-ops"), + makeStream("cl", "cluster-a", "ns", "loki-dev"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), + makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), + }) + require.Equal(t, []string{"stack-cl", "stack-ns", "cl", "ns"}, result, + "must be sorted by streams count in descending order and after this by label name in ascending order") +} + +func makeStream(labelValues ...string) tsdb.Series { + var createdLabels labels.Labels + for i := 0; i < len(labelValues); i += 2 { + createdLabels = append(createdLabels, labels.Label{ + Name: labelValues[i], + Value: labelValues[i+1], + }) + } + return tsdb.Series{Labels: createdLabels} +} + +// compareThree function to compare provided threes to check if they are equal +func compareThree(a, b Tree) bool { + return reflect.DeepEqual(a, b) +} From c9e95aa30e34ec3b8972f175c34eeb65aa97e532 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 28 Dec 2023 15:07:03 +0200 Subject: [PATCH 2/4] adjusted the logic Signed-off-by: Vladyslav Diachenko --- cmd/streams-inspect/streams-inspector.go | 15 +++--- pkg/stream-inspector/flamegraph-converter.go | 53 +++++++++----------- pkg/stream-inspector/inspector.go | 22 ++++++-- pkg/stream-inspector/inspector_test.go | 4 +- 4 files changed, 53 insertions(+), 41 deletions(-) diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go index 226e6e5bf164d..aa12e3b57881d 100644 --- a/cmd/streams-inspect/streams-inspector.go +++ b/cmd/streams-inspect/streams-inspector.go @@ -46,7 +46,9 @@ func main() { level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) //idx.Stats(ctx, "", 0, model.Time(math.MaxInt), ) level.Info(logger).Log("msg", "starting extracting series") - streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, labels.MustNewMatcher(labels.MatchNotEqual, "job", "non-existent")) + var streamMatchers []*labels.Matcher + streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} + streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "anything", "not-existent"))...) if err != nil { level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) return @@ -58,7 +60,7 @@ func main() { for _, stream := range streams { streamStringToFingerprint[stream.Labels.String()] = stream.Fingerprint } - accumulator := seriesvolume.NewAccumulator(int32(len(streams)), len(streams)) + accumulator := seriesvolume.NewAccumulator(math.MaxInt32, math.MaxInt32) err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool { return true }, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent")) @@ -72,8 +74,9 @@ func main() { for _, volume := range volumes { fingerprint, exists := streamStringToFingerprint[volume.Name] if !exists { - level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name) - return + continue + //level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name) + //return } streamToVolume[fingerprint] = float64(volume.Volume) } @@ -83,10 +86,10 @@ func main() { return streamToVolume[a.Fingerprint] > streamToVolume[b.Fingerprint] }) - streams = streams[:5000] + //streams = streams[:5000] level.Info(logger).Log("msg", "starting building trees") inspector := stream_inspector.Inspector{} - trees, err := inspector.BuildTrees(streams, streamToVolume) + trees, err := inspector.BuildTrees(streams, streamToVolume, streamMatchers) if err != nil { level.Error(logger).Log("msg", "error while building trees", "err", err) return diff --git a/pkg/stream-inspector/flamegraph-converter.go b/pkg/stream-inspector/flamegraph-converter.go index 484bee3ae2f73..6f9a672644588 100644 --- a/pkg/stream-inspector/flamegraph-converter.go +++ b/pkg/stream-inspector/flamegraph-converter.go @@ -9,14 +9,15 @@ type FlamegraphConverter struct { func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { dictionary := make(map[string]int) - var levels [][]float64 - + var levels []FlamegraphLevel + //limit := 50000 + //added := 0 iterator := NewTreesLevelsIterator(trees) levelIndex := -1 - for iterator.HasNextLevel() { + for iterator.HasNextLevel() /* && added <= limit*/ { levelIndex++ levelIterator := iterator.NextLevelIterator() - var level []float64 + var level FlamegraphLevel for levelIterator.HasNext() { block := levelIterator.Next() blockName := block.node.Name @@ -26,27 +27,8 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { if levelIndex > 0 && block.childIndex == 0 { previousLevel := levels[levelIndex-1] parentsIndexInPreviousLevel := block.parentBlock.indexInLevel - - // calculate the offset of the parent from the left side(offsets+weight of all left neighbours + parents offset) - parentsOffset := previousLevel[parentsIndexInPreviousLevel*4] - parentsGlobalOffset := parentsOffset - parentsLeftNeighbour := block.parentBlock.leftNeighbour - for parentsLeftNeighbour != nil { - leftNeighbourOffset := previousLevel[parentsLeftNeighbour.indexInLevel*4] - leftNeighbourWeight := previousLevel[parentsLeftNeighbour.indexInLevel*4+1] - parentsGlobalOffset += leftNeighbourOffset + leftNeighbourWeight - parentsLeftNeighbour = parentsLeftNeighbour.leftNeighbour - } - - currentBlockOffset = parentsGlobalOffset - left := block.leftNeighbour - // iterate over all already added blocks from the left - for left != nil { - leftNeighbourOffset := level[left.indexInLevel*4] - leftNeighbourWeight := level[left.indexInLevel*4+1] - currentBlockOffset -= leftNeighbourOffset + leftNeighbourWeight - left = left.leftNeighbour - } + parentsGlobalOffset := previousLevel.blocksGlobalOffsets[parentsIndexInPreviousLevel] + currentBlockOffset = parentsGlobalOffset - level.curentWidth } index, exists := dictionary[blockName] @@ -54,12 +36,15 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { index = len(dictionary) dictionary[blockName] = index } - - level = append(level, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...) + level.blocksGlobalOffsets = append(level.blocksGlobalOffsets, currentBlockOffset+level.curentWidth) + level.curentWidth += currentBlockOffset + blockWeight + level.blocks = append(level.blocks, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...) + //added++ } levels = append(levels, level) } - firstLevel := levels[0] + + firstLevel := levels[0].blocks totalWidth := float64(0) for i := 1; i < len(firstLevel); i += 4 { totalWidth += firstLevel[i] @@ -68,15 +53,25 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { for name, index := range dictionary { names[index] = name } + levelsBlocks := make([][]float64, 0, len(levels)) + for _, level := range levels { + levelsBlocks = append(levelsBlocks, level.blocks) + } return FlameBearer{ Units: "bytes", NumTicks: totalWidth, MaxSelf: totalWidth, Names: names, - Levels: levels, + Levels: levelsBlocks, } } +type FlamegraphLevel struct { + curentWidth float64 + blocks []float64 + blocksGlobalOffsets []float64 +} + type TreesLevelsIterator struct { trees []*Tree currentLevel *LevelBlocksIterator diff --git a/pkg/stream-inspector/inspector.go b/pkg/stream-inspector/inspector.go index 042b4b912e237..f98f2f72e48ff 100644 --- a/pkg/stream-inspector/inspector.go +++ b/pkg/stream-inspector/inspector.go @@ -6,14 +6,15 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "golang.org/x/exp/slices" + "math" "sort" ) type Inspector struct { } -func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64) ([]*Tree, error) { - labelNamesOrder := i.sortLabelNamesByPopularity(streams) +func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64, matchers []*labels.Matcher) ([]*Tree, error) { + labelNamesOrder := i.sortLabelNamesByPopularity(streams, matchers) rootLabelNameToThreeMap := make(map[string]*Tree) var threes []*Tree @@ -97,7 +98,7 @@ var labelsToSkip = map[string]any{ "__stream_shard__": nil, } -func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series) map[string]int { +func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series, matchers []*labels.Matcher) map[string]int { labelNameCounts := make(map[string]uint32) uniqueLabelNames := make([]string, 0, 1000) for _, stream := range streams { @@ -112,13 +113,26 @@ func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series) map[string labelNameCounts[label.Name] = count + 1 } } + matchersLabels := make(map[string]int, len(matchers)) + for idx, matcher := range matchers { + matchersLabels[matcher.Name] = idx + } sort.SliceStable(uniqueLabelNames, func(i, j int) bool { leftLabel := uniqueLabelNames[i] rightLabel := uniqueLabelNames[j] + leftLabelPriority := -1 + if leftLabelIndex, used := matchersLabels[leftLabel]; used { + leftLabelPriority = math.MaxInt - leftLabelIndex + } + rightLabelPriority := -1 + if rightLabelIndex, used := matchersLabels[rightLabel]; used { + rightLabelPriority = math.MaxInt - rightLabelIndex + } leftCount := labelNameCounts[leftLabel] rightCount := labelNameCounts[rightLabel] // sort by streams count (desc) and by label name (asc) - return leftCount > rightCount || leftCount == rightCount && leftLabel < rightLabel + + return leftLabelPriority > rightLabelPriority || (leftLabelPriority == rightLabelPriority && leftCount > rightCount) || leftCount == rightCount && leftLabel < rightLabel }) labelNameToOrderMap := make(map[string]int, len(uniqueLabelNames)) for idx, name := range uniqueLabelNames { diff --git a/pkg/stream-inspector/inspector_test.go b/pkg/stream-inspector/inspector_test.go index 9c3cb1c199f35..a33af82e07266 100644 --- a/pkg/stream-inspector/inspector_test.go +++ b/pkg/stream-inspector/inspector_test.go @@ -104,7 +104,7 @@ func Test_Inspector(t *testing.T) { for name, testData := range tests { t.Run(name, func(t *testing.T) { inspector := Inspector{} - forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1}) + forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1}, nil) require.NoError(t, err) actualJson, err := json.Marshal(forest) require.NoError(t, err) @@ -121,7 +121,7 @@ func Test_Inspector_sortLabelNames(t *testing.T) { makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), - }) + }, nil) require.Equal(t, []string{"stack-cl", "stack-ns", "cl", "ns"}, result, "must be sorted by streams count in descending order and after this by label name in ascending order") } From 1b25e422e82e1347c867f2de2ae05fdae25c748c Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Fri, 5 Jan 2024 18:32:22 +0200 Subject: [PATCH 3/4] removed not need call to series endpoint Signed-off-by: Vladyslav Diachenko --- cmd/streams-inspect/streams-inspector.go | 49 +++++++++--------------- pkg/stream-inspector/inspector.go | 25 ++++-------- 2 files changed, 27 insertions(+), 47 deletions(-) diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go index aa12e3b57881d..39bac2d306731 100644 --- a/cmd/streams-inspect/streams-inspector.go +++ b/cmd/streams-inspect/streams-inspector.go @@ -5,9 +5,9 @@ import ( "encoding/json" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" stream_inspector "github.com/grafana/loki/pkg/stream-inspector" - "golang.org/x/exp/slices" "math" "os" + "strings" "time" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" @@ -45,51 +45,40 @@ func main() { toUnix := model.TimeFromUnix(end.Unix()) level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) //idx.Stats(ctx, "", 0, model.Time(math.MaxInt), ) - level.Info(logger).Log("msg", "starting extracting series") + level.Info(logger).Log("msg", "starting extracting volumes") var streamMatchers []*labels.Matcher streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} - streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "anything", "not-existent"))...) - if err != nil { - level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) - return - } - level.Info(logger).Log("msg", "completed extracting series") - - level.Info(logger).Log("msg", "starting extracting volumes") - streamStringToFingerprint := make(map[string]model.Fingerprint, len(streams)) - for _, stream := range streams { - streamStringToFingerprint[stream.Labels.String()] = stream.Fingerprint - } - accumulator := seriesvolume.NewAccumulator(math.MaxInt32, math.MaxInt32) + limit := int32(math.MaxInt32) + accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32) err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool { return true - }, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent")) + }, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...) if err != nil { level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) return } volumes := accumulator.Volumes().GetVolumes() - - streamToVolume := make(map[model.Fingerprint]float64, len(volumes)) + streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes)) for _, volume := range volumes { - fingerprint, exists := streamStringToFingerprint[volume.Name] - if !exists { - continue - //level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name) - //return + labelsString := strings.Trim(volume.Name, "{}") + pairs := strings.Split(labelsString, ", ") + lbls := make([]string, 0, len(pairs)*2) + for _, pair := range pairs { + lblVal := strings.Split(pair, "=") + lbls = append(lbls, lblVal[0]) + lbls = append(lbls, strings.Trim(lblVal[1], "\"")) } - streamToVolume[fingerprint] = float64(volume.Volume) + streams = append(streams, stream_inspector.StreamWithVolume{ + Labels: labels.FromStrings(lbls...), + Volume: float64(volume.Volume), + }) } - level.Info(logger).Log("msg", "completed extracting volumes") - slices.SortStableFunc(streams, func(a, b tsdb.Series) bool { - return streamToVolume[a.Fingerprint] > streamToVolume[b.Fingerprint] - }) + level.Info(logger).Log("msg", "completed extracting volumes") - //streams = streams[:5000] level.Info(logger).Log("msg", "starting building trees") inspector := stream_inspector.Inspector{} - trees, err := inspector.BuildTrees(streams, streamToVolume, streamMatchers) + trees, err := inspector.BuildTrees(streams, streamMatchers) if err != nil { level.Error(logger).Log("msg", "error while building trees", "err", err) return diff --git a/pkg/stream-inspector/inspector.go b/pkg/stream-inspector/inspector.go index f98f2f72e48ff..df73d37fa5e8c 100644 --- a/pkg/stream-inspector/inspector.go +++ b/pkg/stream-inspector/inspector.go @@ -1,9 +1,6 @@ package stream_inspector import ( - "github.com/grafana/loki/pkg/storage/stores/tsdb" - "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "golang.org/x/exp/slices" "math" @@ -13,7 +10,12 @@ import ( type Inspector struct { } -func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64, matchers []*labels.Matcher) ([]*Tree, error) { +type StreamWithVolume struct { + Labels labels.Labels + Volume float64 +} + +func (i *Inspector) BuildTrees(streams []StreamWithVolume, matchers []*labels.Matcher) ([]*Tree, error) { labelNamesOrder := i.sortLabelNamesByPopularity(streams, matchers) rootLabelNameToThreeMap := make(map[string]*Tree) @@ -38,10 +40,7 @@ func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Finge rootLabelNameToThreeMap[rootLabelName] = rootThree threes = append(threes, rootThree) } - streamVolume, err := i.getStreamVolume(stream, volumesMap) - if err != nil { - return nil, errors.Wrap(err, "can not build tree due to error") - } + streamVolume := stream.Volume currentNode := rootThree.Root for i, label := range streamLabels { var labelNameNode *Node @@ -63,14 +62,6 @@ func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Finge return threes, nil } -func (i *Inspector) getStreamVolume(stream tsdb.Series, volumesMap map[model.Fingerprint]float64) (float64, error) { - streamVolume, exists := volumesMap[stream.Fingerprint] - if !exists { - return 0, errors.New("stream volume not found") - } - return streamVolume, nil -} - type Iterator[T any] struct { values []T position int @@ -98,7 +89,7 @@ var labelsToSkip = map[string]any{ "__stream_shard__": nil, } -func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series, matchers []*labels.Matcher) map[string]int { +func (i *Inspector) sortLabelNamesByPopularity(streams []StreamWithVolume, matchers []*labels.Matcher) map[string]int { labelNameCounts := make(map[string]uint32) uniqueLabelNames := make([]string, 0, 1000) for _, stream := range streams { From a243409536b1278f8c026d34686ac20205395960 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Mon, 8 Jan 2024 09:59:28 +0200 Subject: [PATCH 4/4] added diff mode Signed-off-by: Vladyslav Diachenko --- cmd/streams-inspect/streams-inspector.go | 95 +++++--- pkg/stream-inspector/flamegraph-converter.go | 228 ++++++++++++++---- .../flamegraph-converter_test.go | 59 ++++- pkg/stream-inspector/inspector_test.go | 21 +- 4 files changed, 308 insertions(+), 95 deletions(-) diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go index 39bac2d306731..e24888f9135b2 100644 --- a/cmd/streams-inspect/streams-inspector.go +++ b/cmd/streams-inspect/streams-inspector.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" stream_inspector "github.com/grafana/loki/pkg/stream-inspector" "math" @@ -23,9 +24,9 @@ import ( ) const ( - daySinceUnixEpoc = 19400 - indexLocation = "path to extracted index file .tsdb" - resultFilePath = "path to the file where the resulting json will be stored" + daySinceUnixEpoc = 19610 + indexLocation = "" + resultFilePath = "" ) var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -39,23 +40,66 @@ func main() { level.Error(logger).Log("msg", "can not read index file", "err", err) return } + //first 6 hours start := time.Unix(0, 0).UTC().AddDate(0, 0, daySinceUnixEpoc) - end := start.AddDate(0, 0, 1).Add(-1 * time.Millisecond) + end := start.AddDate(0, 0, 1).Add(-18 * time.Hour) + var streamMatchers []*labels.Matcher + //streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} + limit := int32(5000) + + leftTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers) + if err != nil { + fmt.Println("err", err) + return + } + //last 6 hours + start = start.Add(18 * time.Hour) + end = start.Add(6 * time.Hour) + rightTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers) + if err != nil { + fmt.Println("err", err) + return + } + + level.Info(logger).Log("msg", "starting building flamegraph model") + converter := stream_inspector.FlamegraphConverter{Left: leftTrees, Right: rightTrees, Mode: stream_inspector.Diff} + flameBearer := converter.CovertTrees() + level.Info(logger).Log("msg", "completed building flamegraph model") + + level.Info(logger).Log("msg", "starting writing json") + content, err := json.Marshal(flameBearer) + if err != nil { + panic(err) + return + } + _, err = os.Stat(resultFilePath) + if err == nil { + level.Info(logger).Log("msg", "results file already exists. deleting previous one.") + err := os.Remove(resultFilePath) + if err != nil { + panic(err) + return + } + } + err = os.WriteFile(resultFilePath, content, 0644) + if err != nil { + panic(err) + return + } +} + +func buildTrees(start time.Time, end time.Time, limit int32, idx tsdb.Index, ctx context.Context, streamMatchers []*labels.Matcher) ([]*stream_inspector.Tree, error) { fromUnix := model.TimeFromUnix(start.Unix()) toUnix := model.TimeFromUnix(end.Unix()) - level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) - //idx.Stats(ctx, "", 0, model.Time(math.MaxInt), ) level.Info(logger).Log("msg", "starting extracting volumes") - var streamMatchers []*labels.Matcher - streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} - limit := int32(math.MaxInt32) + level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32) - err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool { + err := idx.Volume(ctx, "", fromUnix, toUnix, accumulator, nil, func(meta index.ChunkMeta) bool { return true }, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...) if err != nil { level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) - return + return nil, fmt.Errorf("error while fetching all the streams: %w", err) } volumes := accumulator.Volumes().GetVolumes() streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes)) @@ -81,33 +125,8 @@ func main() { trees, err := inspector.BuildTrees(streams, streamMatchers) if err != nil { level.Error(logger).Log("msg", "error while building trees", "err", err) - return + return nil, fmt.Errorf("error while building trees: %w", err) } level.Info(logger).Log("msg", "completed building trees") - - level.Info(logger).Log("msg", "starting building flamegraph model") - converter := stream_inspector.FlamegraphConverter{} - flameBearer := converter.CovertTrees(trees) - level.Info(logger).Log("msg", "completed building flamegraph model") - - level.Info(logger).Log("msg", "starting writing json") - content, err := json.Marshal(flameBearer) - if err != nil { - panic(err) - return - } - _, err = os.Stat(resultFilePath) - if err == nil { - level.Info(logger).Log("msg", "results file already exists. deleting previous one.") - err := os.Remove(resultFilePath) - if err != nil { - panic(err) - return - } - } - err = os.WriteFile(resultFilePath, content, 0644) - if err != nil { - panic(err) - return - } + return trees, nil } diff --git a/pkg/stream-inspector/flamegraph-converter.go b/pkg/stream-inspector/flamegraph-converter.go index 6f9a672644588..a1b272bc73f38 100644 --- a/pkg/stream-inspector/flamegraph-converter.go +++ b/pkg/stream-inspector/flamegraph-converter.go @@ -4,15 +4,25 @@ import "golang.org/x/exp/slices" const magicNumber = 0 +type FlamebearerMode uint8 + +const ( + Diff = iota + Single +) + type FlamegraphConverter struct { + Left []*Tree + Right []*Tree + Mode FlamebearerMode } -func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { +func (f *FlamegraphConverter) CovertTrees() FlameGraph { dictionary := make(map[string]int) var levels []FlamegraphLevel //limit := 50000 //added := 0 - iterator := NewTreesLevelsIterator(trees) + iterator := NewTreesLevelsIterator(f.Left, f.Right, f.Mode) levelIndex := -1 for iterator.HasNextLevel() /* && added <= limit*/ { levelIndex++ @@ -20,11 +30,24 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { var level FlamegraphLevel for levelIterator.HasNext() { block := levelIterator.Next() - blockName := block.node.Name - blockWeight := block.node.Weight + var blockName string + if block.leftNode != nil { + blockName = block.leftNode.Name + } else { + blockName = block.rightNode.Name + } + + var leftNodeWeight float64 + if block.leftNode != nil { + leftNodeWeight = block.leftNode.Weight + } + var rightNodeWeight float64 + if block.rightNode != nil { + rightNodeWeight = block.rightNode.Weight + } currentBlockOffset := float64(0) // we need to find the offset for the first child block to place it exactly under the parent - if levelIndex > 0 && block.childIndex == 0 { + if levelIndex > 0 && block.childBlockIndex == 0 { previousLevel := levels[levelIndex-1] parentsIndexInPreviousLevel := block.parentBlock.indexInLevel parentsGlobalOffset := previousLevel.blocksGlobalOffsets[parentsIndexInPreviousLevel] @@ -37,17 +60,33 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { dictionary[blockName] = index } level.blocksGlobalOffsets = append(level.blocksGlobalOffsets, currentBlockOffset+level.curentWidth) - level.curentWidth += currentBlockOffset + blockWeight - level.blocks = append(level.blocks, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...) + level.curentWidth += currentBlockOffset + leftNodeWeight + rightNodeWeight + + level.blocks = append(level.blocks, []float64{currentBlockOffset, leftNodeWeight, magicNumber}...) + if f.Mode == Diff { + level.blocks = append(level.blocks, []float64{0, rightNodeWeight, magicNumber}...) + } + level.blocks = append(level.blocks, float64(index)) //added++ } levels = append(levels, level) } firstLevel := levels[0].blocks - totalWidth := float64(0) - for i := 1; i < len(firstLevel); i += 4 { - totalWidth += firstLevel[i] + totalLeft := float64(0) + totalRight := float64(0) + + blockParamsLength := 4 + if f.Mode == Diff { + blockParamsLength = 7 + } + for i := 1; i < len(firstLevel); i += blockParamsLength { + // leftWidth + totalLeft += firstLevel[i] + if f.Mode == Diff { + //rightWidth + totalRight += firstLevel[i+3] + } } names := make([]string, len(dictionary)) for name, index := range dictionary { @@ -57,13 +96,51 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { for _, level := range levels { levelsBlocks = append(levelsBlocks, level.blocks) } - return FlameBearer{ - Units: "bytes", - NumTicks: totalWidth, - MaxSelf: totalWidth, - Names: names, - Levels: levelsBlocks, + var leftTicks *float64 + var rightTicks *float64 + format := "single" + if f.Mode == Diff { + format = "double" + leftTicks = &totalLeft + rightTicks = &totalRight } + + return FlameGraph{ + Version: 1, + FlameBearer: FlameBearer{ + Units: "bytes", + NumTicks: totalLeft + totalRight, + MaxSelf: totalLeft + totalRight, + Names: names, + Levels: levelsBlocks, + }, + Metadata: Metadata{ + Format: format, + SpyName: "dotnetspy", + SampleRate: 100, + Units: "bytes", + Name: "Logs Volumes", + }, + LeftTicks: leftTicks, + RightTicks: rightTicks, + } + +} + +type Metadata struct { + Format string `json:"format,omitempty"` + SpyName string `json:"spyName,omitempty"` + SampleRate int `json:"sampleRate,omitempty"` + Units string `json:"units,omitempty"` + Name string `json:"name,omitempty"` +} + +type FlameGraph struct { + Version int `json:"version,omitempty"` + FlameBearer FlameBearer `json:"flamebearer"` + Metadata Metadata `json:"metadata"` + LeftTicks *float64 `json:"leftTicks,omitempty"` + RightTicks *float64 `json:"rightTicks,omitempty"` } type FlamegraphLevel struct { @@ -73,12 +150,14 @@ type FlamegraphLevel struct { } type TreesLevelsIterator struct { - trees []*Tree + left []*Tree + right []*Tree currentLevel *LevelBlocksIterator + mode FlamebearerMode } -func NewTreesLevelsIterator(trees []*Tree) *TreesLevelsIterator { - return &TreesLevelsIterator{trees: trees} +func NewTreesLevelsIterator(left []*Tree, right []*Tree, mode FlamebearerMode) *TreesLevelsIterator { + return &TreesLevelsIterator{left: left, right: right, mode: mode} } func (i *TreesLevelsIterator) HasNextLevel() bool { @@ -93,54 +172,119 @@ func (i *TreesLevelsIterator) HasNextLevel() bool { for i.currentLevel.HasNext() { block := i.currentLevel.Next() // if at least one block at current level has children - if len(block.node.Children) > 0 { + if block.leftNode != nil && len(block.leftNode.Children) > 0 || block.rightNode != nil && len(block.rightNode.Children) > 0 { return true } } return false } +func (i *TreesLevelsIterator) mergeNodes(parent *LevelBlock, left []*Node, right []*Node) []*LevelBlock { + leftByNameMap := i.mapByNodeName(left) + rightByNameMap := i.mapByNodeName(right) + blocks := make([]*LevelBlock, 0, len(leftByNameMap)+len(rightByNameMap)) + for _, leftNode := range leftByNameMap { + rightNode := rightByNameMap[leftNode.Name] + blocks = append(blocks, &LevelBlock{ + parentBlock: parent, + leftNode: leftNode, + rightNode: rightNode, + }) + } + for _, rightNode := range rightByNameMap { + //skip nodes that exists in leftNodes to add only diff from the right nodes + _, exists := leftByNameMap[rightNode.Name] + if exists { + continue + } + + blocks = append(blocks, &LevelBlock{ + parentBlock: parent, + rightNode: rightNode, + }) + } + slices.SortStableFunc(blocks, func(a, b *LevelBlock) bool { + return maxWeight(a.leftNode, a.rightNode) > maxWeight(b.leftNode, b.rightNode) + }) + return blocks +} + +func maxWeight(left *Node, right *Node) float64 { + if left == nil { + return right.Weight + } + if right == nil { + return left.Weight + } + if left.Weight > right.Weight { + return left.Weight + } + return right.Weight +} + func (i *TreesLevelsIterator) NextLevelIterator() *LevelBlocksIterator { if i.currentLevel == nil { - levelNodes := make([]*LevelBlock, 0, len(i.trees)) - for index, tree := range i.trees { - var leftNeighbour *LevelBlock + leftNodes := make([]*Node, 0, len(i.left)) + for _, tree := range i.left { + leftNodes = append(leftNodes, tree.Root) + } + rightNodes := make([]*Node, 0, len(i.right)) + for _, tree := range i.right { + rightNodes = append(rightNodes, tree.Root) + } + levelBlocks := i.mergeNodes(nil, leftNodes, rightNodes) + + for index, block := range levelBlocks { if index > 0 { - leftNeighbour = levelNodes[index-1] + block.leftNeighbour = levelBlocks[index-1] } - levelNodes = append(levelNodes, &LevelBlock{leftNeighbour: leftNeighbour, node: tree.Root, childIndex: index, indexInLevel: index}) + block.childBlockIndex = index + block.indexInLevel = index } - slices.SortStableFunc(levelNodes, func(a, b *LevelBlock) bool { - return a.node.Weight > b.node.Weight - }) - i.currentLevel = NewLevelBlocksIterator(levelNodes) + i.currentLevel = NewLevelBlocksIterator(levelBlocks) return i.currentLevel } var nextLevelBlocks []*LevelBlock for i.currentLevel.HasNext() { - block := i.currentLevel.Next() - slices.SortStableFunc(block.node.Children, func(a, b *Node) bool { - return a.Weight > b.Weight - }) - for index, child := range block.node.Children { - var leftNeighbour *LevelBlock + currentBlock := i.currentLevel.Next() + var left []*Node + if currentBlock.leftNode != nil { + left = currentBlock.leftNode.Children + } + var right []*Node + if currentBlock.rightNode != nil { + right = currentBlock.rightNode.Children + } + levelBlocks := i.mergeNodes(currentBlock, left, right) + for index, block := range levelBlocks { + block.childBlockIndex = index + block.indexInLevel = len(nextLevelBlocks) if len(nextLevelBlocks) > 0 { - leftNeighbour = nextLevelBlocks[len(nextLevelBlocks)-1] + block.leftNeighbour = nextLevelBlocks[len(nextLevelBlocks)-1] } - nextLevelBlocks = append(nextLevelBlocks, &LevelBlock{leftNeighbour: leftNeighbour, childIndex: index, indexInLevel: len(nextLevelBlocks), node: child, parentBlock: block}) + nextLevelBlocks = append(nextLevelBlocks, block) } } i.currentLevel = NewLevelBlocksIterator(nextLevelBlocks) return i.currentLevel } +func (i *TreesLevelsIterator) mapByNodeName(nodes []*Node) map[string]*Node { + result := make(map[string]*Node, len(nodes)) + for _, node := range nodes { + result[node.Name] = node + } + return result +} + type LevelBlock struct { - parentBlock *LevelBlock - leftNeighbour *LevelBlock - childIndex int - node *Node - indexInLevel int + parentBlock *LevelBlock + leftNeighbour *LevelBlock + childBlockIndex int + leftNode *Node + rightNode *Node + indexInLevel int } // iterates over Nodes at the level diff --git a/pkg/stream-inspector/flamegraph-converter_test.go b/pkg/stream-inspector/flamegraph-converter_test.go index b6ae0327ee262..eb71f7b25b630 100644 --- a/pkg/stream-inspector/flamegraph-converter_test.go +++ b/pkg/stream-inspector/flamegraph-converter_test.go @@ -10,12 +10,15 @@ const magic = 0 func TestFlamegraphConverter_covertTrees(t *testing.T) { tests := []struct { name string - trees []*Tree + left []*Tree + right []*Tree + mode FlamebearerMode want FlameBearer }{ { name: "expected flame graph to be built with offset for the second level", - trees: []*Tree{ + mode: Single, + left: []*Tree{ // 1st tree { Root: &Node{ @@ -77,7 +80,8 @@ func TestFlamegraphConverter_covertTrees(t *testing.T) { }, { name: "expected flame graph to be built", - trees: []*Tree{ + mode: Single, + left: []*Tree{ // 1st tree { Root: &Node{ @@ -109,12 +113,55 @@ func TestFlamegraphConverter_covertTrees(t *testing.T) { }, }, }, + + { + name: "expected diff flame graph to be built", + mode: Diff, + left: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a_1", Weight: 100}, + }, + }, + }, + }, + right: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 200, Children: []*Node{ + {Name: "second_level-a_1", Weight: 50}, + {Name: "second_level-a_2", Weight: 100}, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 300, + MaxSelf: 300, + Names: []string{"top_level-a", "second_level-a_1", "second_level-a_2"}, + Levels: [][]float64{ + // for each block: offset, value, self, offset_right, value_right, self_right, label, + // 1st level + { /*1st block*/ /*left*/ 0, 100, magic /*right*/, 0, 200, magic, 0}, + // 2nd level + { /*1st block*/ /*left*/ 0, 100, magic /*right*/, 0, 50, magic, 1 /*2nd block*/ /*left*/, 0, 0, magic /*right*/, 0, 100, magic, 2}, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - f := &FlamegraphConverter{} - result := f.CovertTrees(tt.trees) - require.Equal(t, tt.want, result) + f := &FlamegraphConverter{ + Left: tt.left, + Right: tt.right, + Mode: tt.mode, + } + result := f.CovertTrees() + require.Equal(t, tt.want, result.FlameBearer) }) } } diff --git a/pkg/stream-inspector/inspector_test.go b/pkg/stream-inspector/inspector_test.go index a33af82e07266..9a32947a54eac 100644 --- a/pkg/stream-inspector/inspector_test.go +++ b/pkg/stream-inspector/inspector_test.go @@ -2,8 +2,6 @@ package stream_inspector import ( "encoding/json" - "github.com/grafana/loki/pkg/storage/stores/tsdb" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "reflect" @@ -12,11 +10,11 @@ import ( func Test_Inspector(t *testing.T) { tests := map[string]struct { - streams []tsdb.Series + streams []StreamWithVolume expectedResultJSON string }{ "expected 2 threes": { - streams: []tsdb.Series{ + streams: []StreamWithVolume{ makeStream("cl", "cluster-a", "ns", "loki-ops"), makeStream("cl", "cluster-a", "ns", "loki-dev"), makeStream("cl", "cluster-a", "ns", "loki-dev", "level", "error"), @@ -104,7 +102,7 @@ func Test_Inspector(t *testing.T) { for name, testData := range tests { t.Run(name, func(t *testing.T) { inspector := Inspector{} - forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1}, nil) + forest, err := inspector.BuildTrees(testData.streams, nil) require.NoError(t, err) actualJson, err := json.Marshal(forest) require.NoError(t, err) @@ -115,18 +113,20 @@ func Test_Inspector(t *testing.T) { func Test_Inspector_sortLabelNames(t *testing.T) { inspector := Inspector{} - result := inspector.sortLabelNamesByPopularity([]tsdb.Series{ + result := inspector.sortLabelNamesByPopularity([]StreamWithVolume{ makeStream("cl", "cluster-a", "ns", "loki-ops"), makeStream("cl", "cluster-a", "ns", "loki-dev"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), }, nil) - require.Equal(t, []string{"stack-cl", "stack-ns", "cl", "ns"}, result, + require.Equal(t, map[string]int{ + "cl": 2, "ns": 3, "stack-cl": 0, "stack-ns": 1, + }, result, "must be sorted by streams count in descending order and after this by label name in ascending order") } -func makeStream(labelValues ...string) tsdb.Series { +func makeStream(labelValues ...string) StreamWithVolume { var createdLabels labels.Labels for i := 0; i < len(labelValues); i += 2 { createdLabels = append(createdLabels, labels.Label{ @@ -134,7 +134,10 @@ func makeStream(labelValues ...string) tsdb.Series { Value: labelValues[i+1], }) } - return tsdb.Series{Labels: createdLabels} + return StreamWithVolume{ + Labels: createdLabels, + Volume: 1, + } } // compareThree function to compare provided threes to check if they are equal