Skip to content

Commit

Permalink
adjusted the logic
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Dec 28, 2023
1 parent d193fc0 commit c9e95aa
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 41 deletions.
15 changes: 9 additions & 6 deletions cmd/streams-inspect/streams-inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func main() {
level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC())
//idx.Stats(ctx, "", 0, model.Time(math.MaxInt), )
level.Info(logger).Log("msg", "starting extracting series")
streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, labels.MustNewMatcher(labels.MatchNotEqual, "job", "non-existent"))
var streamMatchers []*labels.Matcher
streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")}
streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "anything", "not-existent"))...)
if err != nil {
level.Error(logger).Log("msg", "error while fetching all the streams", "err", err)
return
Expand All @@ -58,7 +60,7 @@ func main() {
for _, stream := range streams {
streamStringToFingerprint[stream.Labels.String()] = stream.Fingerprint
}
accumulator := seriesvolume.NewAccumulator(int32(len(streams)), len(streams))
accumulator := seriesvolume.NewAccumulator(math.MaxInt32, math.MaxInt32)
err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool {
return true
}, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))
Expand All @@ -72,8 +74,9 @@ func main() {
for _, volume := range volumes {
fingerprint, exists := streamStringToFingerprint[volume.Name]
if !exists {
level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name)
return
continue
//level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name)
//return
}
streamToVolume[fingerprint] = float64(volume.Volume)
}
Expand All @@ -83,10 +86,10 @@ func main() {
return streamToVolume[a.Fingerprint] > streamToVolume[b.Fingerprint]
})

streams = streams[:5000]
//streams = streams[:5000]
level.Info(logger).Log("msg", "starting building trees")
inspector := stream_inspector.Inspector{}
trees, err := inspector.BuildTrees(streams, streamToVolume)
trees, err := inspector.BuildTrees(streams, streamToVolume, streamMatchers)
if err != nil {
level.Error(logger).Log("msg", "error while building trees", "err", err)
return
Expand Down
53 changes: 24 additions & 29 deletions pkg/stream-inspector/flamegraph-converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ type FlamegraphConverter struct {

func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer {
dictionary := make(map[string]int)
var levels [][]float64

var levels []FlamegraphLevel
//limit := 50000
//added := 0
iterator := NewTreesLevelsIterator(trees)
levelIndex := -1
for iterator.HasNextLevel() {
for iterator.HasNextLevel() /* && added <= limit*/ {
levelIndex++
levelIterator := iterator.NextLevelIterator()
var level []float64
var level FlamegraphLevel
for levelIterator.HasNext() {
block := levelIterator.Next()
blockName := block.node.Name
Expand All @@ -26,40 +27,24 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer {
if levelIndex > 0 && block.childIndex == 0 {
previousLevel := levels[levelIndex-1]
parentsIndexInPreviousLevel := block.parentBlock.indexInLevel

// calculate the offset of the parent from the left side(offsets+weight of all left neighbours + parents offset)
parentsOffset := previousLevel[parentsIndexInPreviousLevel*4]
parentsGlobalOffset := parentsOffset
parentsLeftNeighbour := block.parentBlock.leftNeighbour
for parentsLeftNeighbour != nil {
leftNeighbourOffset := previousLevel[parentsLeftNeighbour.indexInLevel*4]
leftNeighbourWeight := previousLevel[parentsLeftNeighbour.indexInLevel*4+1]
parentsGlobalOffset += leftNeighbourOffset + leftNeighbourWeight
parentsLeftNeighbour = parentsLeftNeighbour.leftNeighbour
}

currentBlockOffset = parentsGlobalOffset
left := block.leftNeighbour
// iterate over all already added blocks from the left
for left != nil {
leftNeighbourOffset := level[left.indexInLevel*4]
leftNeighbourWeight := level[left.indexInLevel*4+1]
currentBlockOffset -= leftNeighbourOffset + leftNeighbourWeight
left = left.leftNeighbour
}
parentsGlobalOffset := previousLevel.blocksGlobalOffsets[parentsIndexInPreviousLevel]
currentBlockOffset = parentsGlobalOffset - level.curentWidth
}

index, exists := dictionary[blockName]
if !exists {
index = len(dictionary)
dictionary[blockName] = index
}

level = append(level, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...)
level.blocksGlobalOffsets = append(level.blocksGlobalOffsets, currentBlockOffset+level.curentWidth)
level.curentWidth += currentBlockOffset + blockWeight
level.blocks = append(level.blocks, []float64{currentBlockOffset, blockWeight, magicNumber, float64(index)}...)
//added++
}
levels = append(levels, level)
}
firstLevel := levels[0]

firstLevel := levels[0].blocks
totalWidth := float64(0)
for i := 1; i < len(firstLevel); i += 4 {
totalWidth += firstLevel[i]
Expand All @@ -68,15 +53,25 @@ func (f *FlamegraphConverter) CovertTrees(trees []*Tree) FlameBearer {
for name, index := range dictionary {
names[index] = name
}
levelsBlocks := make([][]float64, 0, len(levels))
for _, level := range levels {
levelsBlocks = append(levelsBlocks, level.blocks)
}
return FlameBearer{
Units: "bytes",
NumTicks: totalWidth,
MaxSelf: totalWidth,
Names: names,
Levels: levels,
Levels: levelsBlocks,
}
}

type FlamegraphLevel struct {
curentWidth float64
blocks []float64
blocksGlobalOffsets []float64
}

type TreesLevelsIterator struct {
trees []*Tree
currentLevel *LevelBlocksIterator
Expand Down
22 changes: 18 additions & 4 deletions pkg/stream-inspector/inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"
"math"
"sort"
)

type Inspector struct {
}

func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64) ([]*Tree, error) {
labelNamesOrder := i.sortLabelNamesByPopularity(streams)
func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64, matchers []*labels.Matcher) ([]*Tree, error) {
labelNamesOrder := i.sortLabelNamesByPopularity(streams, matchers)

rootLabelNameToThreeMap := make(map[string]*Tree)
var threes []*Tree
Expand Down Expand Up @@ -97,7 +98,7 @@ var labelsToSkip = map[string]any{
"__stream_shard__": nil,
}

func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series) map[string]int {
func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series, matchers []*labels.Matcher) map[string]int {
labelNameCounts := make(map[string]uint32)
uniqueLabelNames := make([]string, 0, 1000)
for _, stream := range streams {
Expand All @@ -112,13 +113,26 @@ func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series) map[string
labelNameCounts[label.Name] = count + 1
}
}
matchersLabels := make(map[string]int, len(matchers))
for idx, matcher := range matchers {
matchersLabels[matcher.Name] = idx
}
sort.SliceStable(uniqueLabelNames, func(i, j int) bool {
leftLabel := uniqueLabelNames[i]
rightLabel := uniqueLabelNames[j]
leftLabelPriority := -1
if leftLabelIndex, used := matchersLabels[leftLabel]; used {
leftLabelPriority = math.MaxInt - leftLabelIndex
}
rightLabelPriority := -1
if rightLabelIndex, used := matchersLabels[rightLabel]; used {
rightLabelPriority = math.MaxInt - rightLabelIndex
}
leftCount := labelNameCounts[leftLabel]
rightCount := labelNameCounts[rightLabel]
// sort by streams count (desc) and by label name (asc)
return leftCount > rightCount || leftCount == rightCount && leftLabel < rightLabel

return leftLabelPriority > rightLabelPriority || (leftLabelPriority == rightLabelPriority && leftCount > rightCount) || leftCount == rightCount && leftLabel < rightLabel
})
labelNameToOrderMap := make(map[string]int, len(uniqueLabelNames))
for idx, name := range uniqueLabelNames {
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream-inspector/inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Test_Inspector(t *testing.T) {
for name, testData := range tests {
t.Run(name, func(t *testing.T) {
inspector := Inspector{}
forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1})
forest, err := inspector.BuildTrees(testData.streams, map[model.Fingerprint]float64{model.Fingerprint(0): 1}, nil)
require.NoError(t, err)
actualJson, err := json.Marshal(forest)
require.NoError(t, err)
Expand All @@ -121,7 +121,7 @@ func Test_Inspector_sortLabelNames(t *testing.T) {
makeStream("stack-cl", "cluster-b", "stack-ns", "loki-dev"),
makeStream("stack-cl", "cluster-b", "stack-ns", "loki-ops"),
makeStream("stack-cl", "cluster-b", "stack-ns", "loki-prod"),
})
}, nil)
require.Equal(t, []string{"stack-cl", "stack-ns", "cl", "ns"}, result,
"must be sorted by streams count in descending order and after this by label name in ascending order")
}
Expand Down

0 comments on commit c9e95aa

Please sign in to comment.