Skip to content

Commit

Permalink
added diff mode
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko committed Jan 9, 2024
1 parent 1b25e42 commit a243409
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 95 deletions.
95 changes: 57 additions & 38 deletions cmd/streams-inspect/streams-inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
stream_inspector "github.com/grafana/loki/pkg/stream-inspector"
"math"
Expand All @@ -23,9 +24,9 @@ import (
)

const (
daySinceUnixEpoc = 19400
indexLocation = "path to extracted index file .tsdb"
resultFilePath = "path to the file where the resulting json will be stored"
daySinceUnixEpoc = 19610
indexLocation = "<PATH_TO_TSDB_FILE>"
resultFilePath = "<PATH_TO_THE_FILE_TO_WRITE_THE_RESULTS_JSON>"
)

var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
Expand All @@ -39,23 +40,66 @@ func main() {
level.Error(logger).Log("msg", "can not read index file", "err", err)
return
}
//first 6 hours
start := time.Unix(0, 0).UTC().AddDate(0, 0, daySinceUnixEpoc)
end := start.AddDate(0, 0, 1).Add(-1 * time.Millisecond)
end := start.AddDate(0, 0, 1).Add(-18 * time.Hour)
var streamMatchers []*labels.Matcher
//streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")}
limit := int32(5000)

leftTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers)
if err != nil {
fmt.Println("err", err)
return
}
//last 6 hours
start = start.Add(18 * time.Hour)
end = start.Add(6 * time.Hour)
rightTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers)
if err != nil {
fmt.Println("err", err)
return
}

level.Info(logger).Log("msg", "starting building flamegraph model")
converter := stream_inspector.FlamegraphConverter{Left: leftTrees, Right: rightTrees, Mode: stream_inspector.Diff}
flameBearer := converter.CovertTrees()
level.Info(logger).Log("msg", "completed building flamegraph model")

level.Info(logger).Log("msg", "starting writing json")
content, err := json.Marshal(flameBearer)
if err != nil {
panic(err)
return
}
_, err = os.Stat(resultFilePath)
if err == nil {
level.Info(logger).Log("msg", "results file already exists. deleting previous one.")
err := os.Remove(resultFilePath)
if err != nil {
panic(err)
return
}
}
err = os.WriteFile(resultFilePath, content, 0644)
if err != nil {
panic(err)
return
}
}

func buildTrees(start time.Time, end time.Time, limit int32, idx tsdb.Index, ctx context.Context, streamMatchers []*labels.Matcher) ([]*stream_inspector.Tree, error) {
fromUnix := model.TimeFromUnix(start.Unix())
toUnix := model.TimeFromUnix(end.Unix())
level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC())
//idx.Stats(ctx, "", 0, model.Time(math.MaxInt), )
level.Info(logger).Log("msg", "starting extracting volumes")
var streamMatchers []*labels.Matcher
streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")}
limit := int32(math.MaxInt32)
level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC())
accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32)
err = idx.Volume(ctx, "", 0, model.Time(math.MaxInt), accumulator, nil, func(meta index.ChunkMeta) bool {
err := idx.Volume(ctx, "", fromUnix, toUnix, accumulator, nil, func(meta index.ChunkMeta) bool {
return true
}, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...)
if err != nil {
level.Error(logger).Log("msg", "error while fetching all the streams", "err", err)
return
return nil, fmt.Errorf("error while fetching all the streams: %w", err)
}
volumes := accumulator.Volumes().GetVolumes()
streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes))
Expand All @@ -81,33 +125,8 @@ func main() {
trees, err := inspector.BuildTrees(streams, streamMatchers)
if err != nil {
level.Error(logger).Log("msg", "error while building trees", "err", err)
return
return nil, fmt.Errorf("error while building trees: %w", err)
}
level.Info(logger).Log("msg", "completed building trees")

level.Info(logger).Log("msg", "starting building flamegraph model")
converter := stream_inspector.FlamegraphConverter{}
flameBearer := converter.CovertTrees(trees)
level.Info(logger).Log("msg", "completed building flamegraph model")

level.Info(logger).Log("msg", "starting writing json")
content, err := json.Marshal(flameBearer)
if err != nil {
panic(err)
return
}
_, err = os.Stat(resultFilePath)
if err == nil {
level.Info(logger).Log("msg", "results file already exists. deleting previous one.")
err := os.Remove(resultFilePath)
if err != nil {
panic(err)
return
}
}
err = os.WriteFile(resultFilePath, content, 0644)
if err != nil {
panic(err)
return
}
return trees, nil
}
Loading

0 comments on commit a243409

Please sign in to comment.