From a243409536b1278f8c026d34686ac20205395960 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Mon, 8 Jan 2024 09:59:28 +0200 Subject: [PATCH] added diff mode Signed-off-by: Vladyslav Diachenko --- cmd/streams-inspect/streams-inspector.go | 95 +++++--- pkg/stream-inspector/flamegraph-converter.go | 228 ++++++++++++++---- .../flamegraph-converter_test.go | 59 ++++- pkg/stream-inspector/inspector_test.go | 21 +- 4 files changed, 308 insertions(+), 95 deletions(-) diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go index 39bac2d306731..e24888f9135b2 100644 --- a/cmd/streams-inspect/streams-inspector.go +++ b/cmd/streams-inspect/streams-inspector.go @@ -3,6 +3,7 @@ package main import ( "context" "encoding/json" + "fmt" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" stream_inspector "github.com/grafana/loki/pkg/stream-inspector" "math" @@ -23,9 +24,9 @@ import ( ) const ( - daySinceUnixEpoc = 19400 - indexLocation = "path to extracted index file .tsdb" - resultFilePath = "path to the file where the resulting json will be stored" + daySinceUnixEpoc = 19610 + indexLocation = "" + resultFilePath = "" ) var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) @@ -39,23 +40,66 @@ func main() { level.Error(logger).Log("msg", "can not read index file", "err", err) return } + //first 6 hours start := time.Unix(0, 0).UTC().AddDate(0, 0, daySinceUnixEpoc) - end := start.AddDate(0, 0, 1).Add(-1 * time.Millisecond) + end := start.AddDate(0, 0, 1).Add(-18 * time.Hour) + var streamMatchers []*labels.Matcher + //streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} + limit := int32(5000) + + leftTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers) + if err != nil { + fmt.Println("err", err) + return + } + //last 6 hours + start = start.Add(18 * time.Hour) + end = start.Add(6 * time.Hour) + rightTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers) + if err != nil { + fmt.Println("err", err) + return + } + + level.Info(logger).Log("msg", "starting building flamegraph model") + converter := stream_inspector.FlamegraphConverter{Left: leftTrees, Right: rightTrees, Mode: stream_inspector.Diff} + flameBearer := converter.CovertTrees() + level.Info(logger).Log("msg", "completed building flamegraph model") + + level.Info(logger).Log("msg", "starting writing json") + content, err := json.Marshal(flameBearer) + if err != nil { + panic(err) + return + } + _, err = os.Stat(resultFilePath) + if err == nil { + level.Info(logger).Log("msg", "results file already exists. deleting previous one.") + err := os.Remove(resultFilePath) + if err != nil { + panic(err) + return + } + } + err = os.WriteFile(resultFilePath, content, 0644) + if err != nil { + panic(err) + return + } +} + +func buildTrees(start time.Time, end time.Time, limit int32, idx tsdb.Index, ctx context.Context, streamMatchers []*labels.Matcher) ([]*stream_inspector.Tree, error) { fromUnix := model.TimeFromUnix(start.Unix()) toUnix := model.TimeFromUnix(end.Unix()) - level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) - //idx.Stats(ctx, "", 0, model.Time(math.MaxInt), ) level.Info(logger).Log("msg", "starting extracting volumes") - var streamMatchers []*labels.Matcher - streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} - limit := int32(math.MaxInt32) + level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32) - err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool { + err := idx.Volume(ctx, "", fromUnix, toUnix, accumulator, nil, func(meta index.ChunkMeta) bool { return true }, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...) if err != nil { level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) - return + return nil, fmt.Errorf("error while fetching all the streams: %w", err) } volumes := accumulator.Volumes().GetVolumes() streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes)) @@ -81,33 +125,8 @@ func main() { trees, err := inspector.BuildTrees(streams, streamMatchers) if err != nil { level.Error(logger).Log("msg", "error while building trees", "err", err) - return + return nil, fmt.Errorf("error while building trees: %w", err) } level.Info(logger).Log("msg", "completed building trees") - - level.Info(logger).Log("msg", "starting building flamegraph model") - converter := stream_inspector.FlamegraphConverter{} - flameBearer := converter.CovertTrees(trees) - level.Info(logger).Log("msg", "completed building flamegraph model") - - level.Info(logger).Log("msg", "starting writing json") - content, err := json.Marshal(flameBearer) - if err != nil { - panic(err) - return - } - _, err = os.Stat(resultFilePath) - if err == nil { - level.Info(logger).Log("msg", "results file already exists. deleting previous one.") - err := os.Remove(resultFilePath) - if err != nil { - panic(err) - return - } - } - err = os.WriteFile(resultFilePath, content, 0644) - if err != nil { - panic(err) - return - } + return trees, nil } diff --git a/pkg/stream-inspector/flamegraph-converter.go b/pkg/stream-inspector/flamegraph-converter.go index 6f9a672644588..a1b272bc73f38 100644 --- a/pkg/stream-inspector/flamegraph-converter.go +++ b/pkg/stream-inspector/flamegraph-converter.go @@ -4,15 +4,25 @@ import "golang.org/x/exp/slices" const magicNumber = 0 +type FlamebearerMode uint8 + +const ( + Diff = iota + Single +) + type FlamegraphConverter struct { + Left []*Tree + Right []*Tree + Mode FlamebearerMode } -func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { +func (f *FlamegraphConverter) CovertTrees() FlameGraph { dictionary := make(map[string]int) var levels []FlamegraphLevel //limit := 50000 //added := 0 - iterator := NewTreesLevelsIterator(trees) + iterator := NewTreesLevelsIterator(f.Left, f.Right, f.Mode) levelIndex := -1 for iterator.HasNextLevel() /* && added <= limit*/ { levelIndex++ @@ -20,11 +30,24 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { var level FlamegraphLevel for levelIterator.HasNext() { block := levelIterator.Next() - blockName := block.node.Name - blockWeight := block.node.Weight + var blockName string + if block.leftNode != nil { + blockName = block.leftNode.Name + } else { + blockName = block.rightNode.Name + } + + var leftNodeWeight float64 + if block.leftNode != nil { + leftNodeWeight = block.leftNode.Weight + } + var rightNodeWeight float64 + if block.rightNode != nil { + rightNodeWeight = block.rightNode.Weight + } currentBlockOffset := float64(0) // we need to find the offset for the first child block to place it exactly under the parent - if levelIndex > 0 && block.childIndex == 0 { + if levelIndex > 0 && block.childBlockIndex == 0 { previousLevel := levels[levelIndex-1] parentsIndexInPreviousLevel := block.parentBlock.indexInLevel parentsGlobalOffset := previousLevel.blocksGlobalOffsets[parentsIndexInPreviousLevel] @@ -37,17 +60,33 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { dictionary[blockName] = index } level.blocksGlobalOffsets = append(level.blocksGlobalOffsets, currentBlockOffset+level.curentWidth) - level.curentWidth += currentBlockOffset + blockWeight - level.blocks = append(level.blocks, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...) + level.curentWidth += currentBlockOffset + leftNodeWeight + rightNodeWeight + + level.blocks = append(level.blocks, []float64{currentBlockOffset, leftNodeWeight, magicNumber}...) + if f.Mode == Diff { + level.blocks = append(level.blocks, []float64{0, rightNodeWeight, magicNumber}...) + } + level.blocks = append(level.blocks, float64(index)) //added++ } levels = append(levels, level) } firstLevel := levels[0].blocks - totalWidth := float64(0) - for i := 1; i < len(firstLevel); i += 4 { - totalWidth += firstLevel[i] + totalLeft := float64(0) + totalRight := float64(0) + + blockParamsLength := 4 + if f.Mode == Diff { + blockParamsLength = 7 + } + for i := 1; i < len(firstLevel); i += blockParamsLength { + // leftWidth + totalLeft += firstLevel[i] + if f.Mode == Diff { + //rightWidth + totalRight += firstLevel[i+3] + } } names := make([]string, len(dictionary)) for name, index := range dictionary { @@ -57,13 +96,51 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer { for _, level := range levels { levelsBlocks = append(levelsBlocks, level.blocks) } - return FlameBearer{ - Units: "bytes", - NumTicks: totalWidth, - MaxSelf: totalWidth, - Names: names, - Levels: levelsBlocks, + var leftTicks *float64 + var rightTicks *float64 + format := "single" + if f.Mode == Diff { + format = "double" + leftTicks = &totalLeft + rightTicks = &totalRight } + + return FlameGraph{ + Version: 1, + FlameBearer: FlameBearer{ + Units: "bytes", + NumTicks: totalLeft + totalRight, + MaxSelf: totalLeft + totalRight, + Names: names, + Levels: levelsBlocks, + }, + Metadata: Metadata{ + Format: format, + SpyName: "dotnetspy", + SampleRate: 100, + Units: "bytes", + Name: "Logs Volumes", + }, + LeftTicks: leftTicks, + RightTicks: rightTicks, + } + +} + +type Metadata struct { + Format string `json:"format,omitempty"` + SpyName string `json:"spyName,omitempty"` + SampleRate int `json:"sampleRate,omitempty"` + Units string `json:"units,omitempty"` + Name string `json:"name,omitempty"` +} + +type FlameGraph struct { + Version int `json:"version,omitempty"` + FlameBearer FlameBearer `json:"flamebearer"` + Metadata Metadata `json:"metadata"` + LeftTicks *float64 `json:"leftTicks,omitempty"` + RightTicks *float64 `json:"rightTicks,omitempty"` } type FlamegraphLevel struct { @@ -73,12 +150,14 @@ type FlamegraphLevel struct { } type TreesLevelsIterator struct { - trees []*Tree + left []*Tree + right []*Tree currentLevel *LevelBlocksIterator + mode FlamebearerMode } -func NewTreesLevelsIterator(trees []*Tree) *TreesLevelsIterator { - return &TreesLevelsIterator{trees: trees} +func NewTreesLevelsIterator(left []*Tree, right []*Tree, mode FlamebearerMode) *TreesLevelsIterator { + return &TreesLevelsIterator{left: left, right: right, mode: mode} } func (i *TreesLevelsIterator) HasNextLevel() bool { @@ -93,54 +172,119 @@ func (i *TreesLevelsIterator) HasNextLevel() bool { for i.currentLevel.HasNext() { block := i.currentLevel.Next() // if at least one block at current level has children - if len(block.node.Children) > 0 { + if block.leftNode != nil && len(block.leftNode.Children) > 0 || block.rightNode != nil && len(block.rightNode.Children) > 0 { return true } } return false } +func (i *TreesLevelsIterator) mergeNodes(parent *LevelBlock, left []*Node, right []*Node) []*LevelBlock { + leftByNameMap := i.mapByNodeName(left) + rightByNameMap := i.mapByNodeName(right) + blocks := make([]*LevelBlock, 0, len(leftByNameMap)+len(rightByNameMap)) + for _, leftNode := range leftByNameMap { + rightNode := rightByNameMap[leftNode.Name] + blocks = append(blocks, &LevelBlock{ + parentBlock: parent, + leftNode: leftNode, + rightNode: rightNode, + }) + } + for _, rightNode := range rightByNameMap { + //skip nodes that exists in leftNodes to add only diff from the right nodes + _, exists := leftByNameMap[rightNode.Name] + if exists { + continue + } + + blocks = append(blocks, &LevelBlock{ + parentBlock: parent, + rightNode: rightNode, + }) + } + slices.SortStableFunc(blocks, func(a, b *LevelBlock) bool { + return maxWeight(a.leftNode, a.rightNode) > maxWeight(b.leftNode, b.rightNode) + }) + return blocks +} + +func maxWeight(left *Node, right *Node) float64 { + if left == nil { + return right.Weight + } + if right == nil { + return left.Weight + } + if left.Weight > right.Weight { + return left.Weight + } + return right.Weight +} + func (i *TreesLevelsIterator) NextLevelIterator() *LevelBlocksIterator { if i.currentLevel == nil { - levelNodes := make([]*LevelBlock, 0, len(i.trees)) - for index, tree := range i.trees { - var leftNeighbour *LevelBlock + leftNodes := make([]*Node, 0, len(i.left)) + for _, tree := range i.left { + leftNodes = append(leftNodes, tree.Root) + } + rightNodes := make([]*Node, 0, len(i.right)) + for _, tree := range i.right { + rightNodes = append(rightNodes, tree.Root) + } + levelBlocks := i.mergeNodes(nil, leftNodes, rightNodes) + + for index, block := range levelBlocks { if index > 0 { - leftNeighbour = levelNodes[index-1] + block.leftNeighbour = levelBlocks[index-1] } - levelNodes = append(levelNodes, &LevelBlock{leftNeighbour: leftNeighbour, node: tree.Root, childIndex: index, indexInLevel: index}) + block.childBlockIndex = index + block.indexInLevel = index } - slices.SortStableFunc(levelNodes, func(a, b *LevelBlock) bool { - return a.node.Weight > b.node.Weight - }) - i.currentLevel = NewLevelBlocksIterator(levelNodes) + i.currentLevel = NewLevelBlocksIterator(levelBlocks) return i.currentLevel } var nextLevelBlocks []*LevelBlock for i.currentLevel.HasNext() { - block := i.currentLevel.Next() - slices.SortStableFunc(block.node.Children, func(a, b *Node) bool { - return a.Weight > b.Weight - }) - for index, child := range block.node.Children { - var leftNeighbour *LevelBlock + currentBlock := i.currentLevel.Next() + var left []*Node + if currentBlock.leftNode != nil { + left = currentBlock.leftNode.Children + } + var right []*Node + if currentBlock.rightNode != nil { + right = currentBlock.rightNode.Children + } + levelBlocks := i.mergeNodes(currentBlock, left, right) + for index, block := range levelBlocks { + block.childBlockIndex = index + block.indexInLevel = len(nextLevelBlocks) if len(nextLevelBlocks) > 0 { - leftNeighbour = nextLevelBlocks[len(nextLevelBlocks)-1] + block.leftNeighbour = nextLevelBlocks[len(nextLevelBlocks)-1] } - nextLevelBlocks = append(nextLevelBlocks, &LevelBlock{leftNeighbour: leftNeighbour, childIndex: index, indexInLevel: len(nextLevelBlocks), node: child, parentBlock: block}) + nextLevelBlocks = append(nextLevelBlocks, block) } } i.currentLevel = NewLevelBlocksIterator(nextLevelBlocks) return i.currentLevel } +func (i *TreesLevelsIterator) mapByNodeName(nodes []*Node) map[string]*Node { + result := make(map[string]*Node, len(nodes)) + for _, node := range nodes { + result[node.Name] = node + } + return result +} + type LevelBlock struct { - parentBlock *LevelBlock - leftNeighbour *LevelBlock - childIndex int - node *Node - indexInLevel int + parentBlock *LevelBlock + leftNeighbour *LevelBlock + childBlockIndex int + leftNode *Node + rightNode *Node + indexInLevel int } // iterates over Nodes at the level diff --git a/pkg/stream-inspector/flamegraph-converter_test.go b/pkg/stream-inspector/flamegraph-converter_test.go index b6ae0327ee262..eb71f7b25b630 100644 --- a/pkg/stream-inspector/flamegraph-converter_test.go +++ b/pkg/stream-inspector/flamegraph-converter_test.go @@ -10,12 +10,15 @@ const magic = 0 func TestFlamegraphConverter_covertTrees(t *testing.T) { tests := []struct { name string - trees []*Tree + left []*Tree + right []*Tree + mode FlamebearerMode want FlameBearer }{ { name: "expected flame graph to be built with offset for the second level", - trees: []*Tree{ + mode: Single, + left: []*Tree{ // 1st tree { Root: &Node{ @@ -77,7 +80,8 @@ func TestFlamegraphConverter_covertTrees(t *testing.T) { }, { name: "expected flame graph to be built", - trees: []*Tree{ + mode: Single, + left: []*Tree{ // 1st tree { Root: &Node{ @@ -109,12 +113,55 @@ func TestFlamegraphConverter_covertTrees(t *testing.T) { }, }, }, + + { + name: "expected diff flame graph to be built", + mode: Diff, + left: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 100, Children: []*Node{ + {Name: "second_level-a_1", Weight: 100}, + }, + }, + }, + }, + right: []*Tree{ + // 1st tree + { + Root: &Node{ + Name: "top_level-a", Weight: 200, Children: []*Node{ + {Name: "second_level-a_1", Weight: 50}, + {Name: "second_level-a_2", Weight: 100}, + }, + }, + }, + }, + want: FlameBearer{ + Units: "bytes", + NumTicks: 300, + MaxSelf: 300, + Names: []string{"top_level-a", "second_level-a_1", "second_level-a_2"}, + Levels: [][]float64{ + // for each block: offset, value, self, offset_right, value_right, self_right, label, + // 1st level + { /*1st block*/ /*left*/ 0, 100, magic /*right*/, 0, 200, magic, 0}, + // 2nd level + { /*1st block*/ /*left*/ 0, 100, magic /*right*/, 0, 50, magic, 1 /*2nd block*/ /*left*/, 0, 0, magic /*right*/, 0, 100, magic, 2}, + }, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - f := &FlamegraphConverter{} - result := f.CovertTrees(tt.trees) - require.Equal(t, tt.want, result) + f := &FlamegraphConverter{ + Left: tt.left, + Right: tt.right, + Mode: tt.mode, + } + result := f.CovertTrees() + require.Equal(t, tt.want, result.FlameBearer) }) } } diff --git a/pkg/stream-inspector/inspector_test.go b/pkg/stream-inspector/inspector_test.go index a33af82e07266..9a32947a54eac 100644 --- a/pkg/stream-inspector/inspector_test.go +++ b/pkg/stream-inspector/inspector_test.go @@ -2,8 +2,6 @@ package stream_inspector import ( "encoding/json" - "github.com/grafana/loki/pkg/storage/stores/tsdb" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "reflect" @@ -12,11 +10,11 @@ import ( func Test_Inspector(t *testing.T) { tests := map[string]struct { - streams []tsdb.Series + streams []StreamWithVolume expectedResultJSON string }{ "expected 2 threes": { - streams: []tsdb.Series{ + streams: []StreamWithVolume{ makeStream("cl", "cluster-a", "ns", "loki-ops"), makeStream("cl", "cluster-a", "ns", "loki-dev"), makeStream("cl", "cluster-a", "ns", "loki-dev", "level", "error"), @@ -104,7 +102,7 @@ func Test_Inspector(t *testing.T) { for name, testData := range tests { t.Run(name, func(t *testing.T) { inspector := Inspector{} - forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1}, nil) + forest, err := inspector.BuildTrees(testData.streams, nil) require.NoError(t, err) actualJson, err := json.Marshal(forest) require.NoError(t, err) @@ -115,18 +113,20 @@ func Test_Inspector(t *testing.T) { func Test_Inspector_sortLabelNames(t *testing.T) { inspector := Inspector{} - result := inspector.sortLabelNamesByPopularity([]tsdb.Series{ + result := inspector.sortLabelNamesByPopularity([]StreamWithVolume{ makeStream("cl", "cluster-a", "ns", "loki-ops"), makeStream("cl", "cluster-a", "ns", "loki-dev"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"), makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"), }, nil) - require.Equal(t, []string{"stack-cl", "stack-ns", "cl", "ns"}, result, + require.Equal(t, map[string]int{ + "cl": 2, "ns": 3, "stack-cl": 0, "stack-ns": 1, + }, result, "must be sorted by streams count in descending order and after this by label name in ascending order") } -func makeStream(labelValues ...string) tsdb.Series { +func makeStream(labelValues ...string) StreamWithVolume { var createdLabels labels.Labels for i := 0; i < len(labelValues); i += 2 { createdLabels = append(createdLabels, labels.Label{ @@ -134,7 +134,10 @@ func makeStream(labelValues ...string) tsdb.Series { Value: labelValues[i+1], }) } - return tsdb.Series{Labels: createdLabels} + return StreamWithVolume{ + Labels: createdLabels, + Volume: 1, + } } // compareThree function to compare provided threes to check if they are equal