Skip to content

Commit

Permalink
removed not need call to series endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Jan 5, 2024
1 parent c9e95aa commit 1b25e42
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 47 deletions.
49 changes: 19 additions & 30 deletions cmd/streams-inspect/streams-inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"encoding/json"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
stream_inspector "github.com/grafana/loki/pkg/stream-inspector"
"golang.org/x/exp/slices"
"math"
"os"
"strings"
"time"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
Expand Down Expand Up @@ -45,51 +45,40 @@ func main() {
toUnix := model.TimeFromUnix(end.Unix())
level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC())
//idx.Stats(ctx, "", 0, model.Time(math.MaxInt), )
level.Info(logger).Log("msg", "starting extracting series")
level.Info(logger).Log("msg", "starting extracting volumes")
var streamMatchers []*labels.Matcher
streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")}
streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "anything", "not-existent"))...)
if err != nil {
level.Error(logger).Log("msg", "error while fetching all the streams", "err", err)
return
}
level.Info(logger).Log("msg", "completed extracting series")

level.Info(logger).Log("msg", "starting extracting volumes")
streamStringToFingerprint := make(map[string]model.Fingerprint, len(streams))
for _, stream := range streams {
streamStringToFingerprint[stream.Labels.String()] = stream.Fingerprint
}
accumulator := seriesvolume.NewAccumulator(math.MaxInt32, math.MaxInt32)
limit := int32(math.MaxInt32)
accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32)
err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool {
return true
}, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))
}, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...)
if err != nil {
level.Error(logger).Log("msg", "error while fetching all the streams", "err", err)
return
}
volumes := accumulator.Volumes().GetVolumes()

streamToVolume := make(map[model.Fingerprint]float64, len(volumes))
streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes))
for _, volume := range volumes {
fingerprint, exists := streamStringToFingerprint[volume.Name]
if !exists {
continue
//level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name)
//return
labelsString := strings.Trim(volume.Name, "{}")
pairs := strings.Split(labelsString, ", ")
lbls := make([]string, 0, len(pairs)*2)
for _, pair := range pairs {
lblVal := strings.Split(pair, "=")
lbls = append(lbls, lblVal[0])
lbls = append(lbls, strings.Trim(lblVal[1], "\""))
}
streamToVolume[fingerprint] = float64(volume.Volume)
streams = append(streams, stream_inspector.StreamWithVolume{
Labels: labels.FromStrings(lbls...),
Volume: float64(volume.Volume),
})
}
level.Info(logger).Log("msg", "completed extracting volumes")

slices.SortStableFunc(streams, func(a, b tsdb.Series) bool {
return streamToVolume[a.Fingerprint] > streamToVolume[b.Fingerprint]
})
level.Info(logger).Log("msg", "completed extracting volumes")

//streams = streams[:5000]
level.Info(logger).Log("msg", "starting building trees")
inspector := stream_inspector.Inspector{}
trees, err := inspector.BuildTrees(streams, streamToVolume, streamMatchers)
trees, err := inspector.BuildTrees(streams, streamMatchers)
if err != nil {
level.Error(logger).Log("msg", "error while building trees", "err", err)
return
Expand Down
25 changes: 8 additions & 17 deletions pkg/stream-inspector/inspector.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package stream_inspector

import (
"github.com/grafana/loki/pkg/storage/stores/tsdb"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/exp/slices"
"math"
Expand All @@ -13,7 +10,12 @@ import (
type Inspector struct {
}

func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64, matchers []*labels.Matcher) ([]*Tree, error) {
type StreamWithVolume struct {
Labels labels.Labels
Volume float64
}

func (i *Inspector) BuildTrees(streams []StreamWithVolume, matchers []*labels.Matcher) ([]*Tree, error) {
labelNamesOrder := i.sortLabelNamesByPopularity(streams, matchers)

rootLabelNameToThreeMap := make(map[string]*Tree)
Expand All @@ -38,10 +40,7 @@ func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Finge
rootLabelNameToThreeMap[rootLabelName] = rootThree
threes = append(threes, rootThree)
}
streamVolume, err := i.getStreamVolume(stream, volumesMap)
if err != nil {
return nil, errors.Wrap(err, "can not build tree due to error")
}
streamVolume := stream.Volume
currentNode := rootThree.Root
for i, label := range streamLabels {
var labelNameNode *Node
Expand All @@ -63,14 +62,6 @@ func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Finge
return threes, nil
}

func (i *Inspector) getStreamVolume(stream tsdb.Series, volumesMap map[model.Fingerprint]float64) (float64, error) {
streamVolume, exists := volumesMap[stream.Fingerprint]
if !exists {
return 0, errors.New("stream volume not found")
}
return streamVolume, nil
}

type Iterator[T any] struct {
values []T
position int
Expand Down Expand Up @@ -98,7 +89,7 @@ var labelsToSkip = map[string]any{
"__stream_shard__": nil,
}

func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series, matchers []*labels.Matcher) map[string]int {
func (i *Inspector) sortLabelNamesByPopularity(streams []StreamWithVolume, matchers []*labels.Matcher) map[string]int {
labelNameCounts := make(map[string]uint32)
uniqueLabelNames := make([]string, 0, 1000)
for _, stream := range streams {
Expand Down

0 comments on commit 1b25e42

Please sign in to comment.