diff --git a/cmd/streams-inspect/streams-inspector.go b/cmd/streams-inspect/streams-inspector.go index aa12e3b57881d..39bac2d306731 100644 --- a/cmd/streams-inspect/streams-inspector.go +++ b/cmd/streams-inspect/streams-inspector.go @@ -5,9 +5,9 @@ import ( "encoding/json" "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" stream_inspector "github.com/grafana/loki/pkg/stream-inspector" - "golang.org/x/exp/slices" "math" "os" + "strings" "time" "github.com/grafana/loki/pkg/storage/stores/tsdb/index" @@ -45,51 +45,40 @@ func main() { toUnix := model.TimeFromUnix(end.Unix()) level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC()) //idx.Stats(ctx, "", 0, model.Time(math.MaxInt), ) - level.Info(logger).Log("msg", "starting extracting series") + level.Info(logger).Log("msg", "starting extracting volumes") var streamMatchers []*labels.Matcher streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")} - streams, err := idx.Series(ctx, "", 0, model.Time(math.MaxInt), nil, nil, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "anything", "not-existent"))...) - if err != nil { - level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) - return - } - level.Info(logger).Log("msg", "completed extracting series") - - level.Info(logger).Log("msg", "starting extracting volumes") - streamStringToFingerprint := make(map[string]model.Fingerprint, len(streams)) - for _, stream := range streams { - streamStringToFingerprint[stream.Labels.String()] = stream.Fingerprint - } - accumulator := seriesvolume.NewAccumulator(math.MaxInt32, math.MaxInt32) + limit := int32(math.MaxInt32) + accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32) err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool { return true - }, nil, seriesvolume.Series, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent")) + }, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...) if err != nil { level.Error(logger).Log("msg", "error while fetching all the streams", "err", err) return } volumes := accumulator.Volumes().GetVolumes() - - streamToVolume := make(map[model.Fingerprint]float64, len(volumes)) + streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes)) for _, volume := range volumes { - fingerprint, exists := streamStringToFingerprint[volume.Name] - if !exists { - continue - //level.Error(logger).Log("msg", "can not find fingerprint", "volumeName", volume.Name) - //return + labelsString := strings.Trim(volume.Name, "{}") + pairs := strings.Split(labelsString, ", ") + lbls := make([]string, 0, len(pairs)*2) + for _, pair := range pairs { + lblVal := strings.Split(pair, "=") + lbls = append(lbls, lblVal[0]) + lbls = append(lbls, strings.Trim(lblVal[1], "\"")) } - streamToVolume[fingerprint] = float64(volume.Volume) + streams = append(streams, stream_inspector.StreamWithVolume{ + Labels: labels.FromStrings(lbls...), + Volume: float64(volume.Volume), + }) } - level.Info(logger).Log("msg", "completed extracting volumes") - slices.SortStableFunc(streams, func(a, b tsdb.Series) bool { - return streamToVolume[a.Fingerprint] > streamToVolume[b.Fingerprint] - }) + level.Info(logger).Log("msg", "completed extracting volumes") - //streams = streams[:5000] level.Info(logger).Log("msg", "starting building trees") inspector := stream_inspector.Inspector{} - trees, err := inspector.BuildTrees(streams, streamToVolume, streamMatchers) + trees, err := inspector.BuildTrees(streams, streamMatchers) if err != nil { level.Error(logger).Log("msg", "error while building trees", "err", err) return diff --git a/pkg/stream-inspector/inspector.go b/pkg/stream-inspector/inspector.go index f98f2f72e48ff..df73d37fa5e8c 100644 --- a/pkg/stream-inspector/inspector.go +++ b/pkg/stream-inspector/inspector.go @@ -1,9 +1,6 @@ package stream_inspector import ( - "github.com/grafana/loki/pkg/storage/stores/tsdb" - "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "golang.org/x/exp/slices" "math" @@ -13,7 +10,12 @@ import ( type Inspector struct { } -func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Fingerprint]float64, matchers []*labels.Matcher) ([]*Tree, error) { +type StreamWithVolume struct { + Labels labels.Labels + Volume float64 +} + +func (i *Inspector) BuildTrees(streams []StreamWithVolume, matchers []*labels.Matcher) ([]*Tree, error) { labelNamesOrder := i.sortLabelNamesByPopularity(streams, matchers) rootLabelNameToThreeMap := make(map[string]*Tree) @@ -38,10 +40,7 @@ func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Finge rootLabelNameToThreeMap[rootLabelName] = rootThree threes = append(threes, rootThree) } - streamVolume, err := i.getStreamVolume(stream, volumesMap) - if err != nil { - return nil, errors.Wrap(err, "can not build tree due to error") - } + streamVolume := stream.Volume currentNode := rootThree.Root for i, label := range streamLabels { var labelNameNode *Node @@ -63,14 +62,6 @@ func (i *Inspector) BuildTrees(streams []tsdb.Series, volumesMap map[model.Finge return threes, nil } -func (i *Inspector) getStreamVolume(stream tsdb.Series, volumesMap map[model.Fingerprint]float64) (float64, error) { - streamVolume, exists := volumesMap[stream.Fingerprint] - if !exists { - return 0, errors.New("stream volume not found") - } - return streamVolume, nil -} - type Iterator[T any] struct { values []T position int @@ -98,7 +89,7 @@ var labelsToSkip = map[string]any{ "__stream_shard__": nil, } -func (i *Inspector) sortLabelNamesByPopularity(streams []tsdb.Series, matchers []*labels.Matcher) map[string]int { +func (i *Inspector) sortLabelNamesByPopularity(streams []StreamWithVolume, matchers []*labels.Matcher) map[string]int { labelNameCounts := make(map[string]uint32) uniqueLabelNames := make([]string, 0, 1000) for _, stream := range streams {