Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams flamegraph #11614

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions cmd/streams-compactor/streams-compactor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package main

import (
"context"
"encoding/json"
"math"
"os"
"sort"
"time"

"github.com/grafana/dskit/concurrency"
"github.com/pkg/errors"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
)

var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

const (
resultsFileLocation = "path to json file with results from streams-inspector"
desiredChunkSizeKB = 6 * 1024
)

func main() {
stats := IndexStats{}
level.Info(logger).Log("msg", "reading file content")
content, err := os.ReadFile(resultsFileLocation)
if err != nil {
level.Error(logger).Log("msg", "error while reading the file", "err", err)
return
}
level.Info(logger).Log("msg", "unmarshalling the json")
err = json.Unmarshal(content, &stats)
if err != nil {
level.Error(logger).Log("msg", "error while unmarshalling", "err", err)
return
}

err = assertCorrect(stats)
if err != nil {
level.Error(logger).Log("msg", "results are incorrect. can not continue", "err", err)
return
}

level.Info(logger).Log("msg", "results are correct")
computeMetrics(&stats)
printStats(stats, "before compaction")
level.Info(logger).Log("msg", "start compaction")
compacted, err := compact(stats, desiredChunkSizeKB)
if err != nil {
level.Error(logger).Log("msg", "error while compacting", "err", err)
return
}
level.Info(logger).Log("msg", "complete compaction")
computeMetrics(&compacted)
printStats(compacted, "after compaction")
}

func assertCorrect(stats IndexStats) error {
if len(stats.Streams) != int(stats.StreamsCount) {
level.Error(logger).Log("msg", "streams count mismatch", "streams_count", stats.StreamsCount, "actual_streams_count", len(stats.Streams))
return errors.New("streams count mismatch")
}
actualSizeKB := uint32(0)
actualChunksCount := uint32(0)
for _, stream := range stats.Streams {
actualChunksCount += uint32(len(stream.Chunks))
for _, chunk := range stream.Chunks {
actualSizeKB += chunk.SizeKB
}
}
if actualSizeKB != stats.SizeKB {
level.Error(logger).Log("msg", "sizeKB mismatch", "size_kb", stats.SizeKB, "actual_size_kb", actualSizeKB)
return errors.New("sizeKB mismatch")
}

if actualChunksCount != stats.ChunksCount {
level.Error(logger).Log("msg", "chunks count mismatch", "chunks_count", stats.ChunksCount, "actual_chunks_count", actualChunksCount)
return errors.New("chunks count mismatch")
}
return nil
}

//func truncate(stats IndexStats, content []byte, err error) {
// updated := stats.Streams[0:20]
// sizeKb := uint32(0)
// chunksCount := uint32(0)
// for _, streamStats := range updated {
// chunksCount += streamStats.ChunksCount
// for _, chunk := range streamStats.Chunks {
// sizeKb += chunk.SizeKB
// }
// }
// truncated := IndexStats{
// Streams: updated,
// StreamsCount: 20,
// SizeKB: sizeKb,
// ChunksCount: chunksCount,
// }
// content, err = json.MarshalIndent(truncated, " ", " ")
// if err != nil {
// panic(err)
// return
// }
// err = os.WriteFile("path to truncated file", content, 0644)
// if err != nil {
// panic(err)
// return
// }
//}

func printStats(stats IndexStats, msg string) {
level.Info(logger).Log(
"msg", msg,
"streams_count", stats.StreamsCount,
"chunks_count", stats.ChunksCount,
"size_kb", stats.SizeKB,
"chunk_avg_size_kb", stats.ChunkAvgSizeKB,
"chunk_med_size_kb", stats.ChunkMedSizeKB,
"min_chunk_size_kb", stats.MinChunkSizeKB,
"max_chunk_size_kb", stats.MaxChunkSizeKB,
)
}

func compact(stats IndexStats, desiredChunkSizeKB uint32) (IndexStats, error) {
compactedStreamStats := make([]StreamStats, len(stats.Streams))
err := concurrency.ForEachJob(context.Background(), len(stats.Streams), 100, func(ctx context.Context, idx int) error {
stream := stats.Streams[idx]
compactedStream := StreamStats{Stream: stream.Stream, Chunks: make([]ChunkStats, 0, len(stream.Chunks))}
lastNotCompactedChunkIdx := -1
currentCompactedChunkSizeKb := uint32(0)
for i := 0; i < len(stream.Chunks); i++ {
currentChunk := stream.Chunks[i]
// if we reach desired size
if currentChunk.SizeKB+currentCompactedChunkSizeKb > desiredChunkSizeKB ||
// or if it's the last chunk
i == len(stream.Chunks)-1 ||
// or if the size of the next chunk is greater than desired size
stream.Chunks[i+1].SizeKB > desiredChunkSizeKB {
compactedChunkStart := currentChunk.Start
compactedChunkEnd := currentChunk.End
if lastNotCompactedChunkIdx > -1 {
if stream.Chunks[lastNotCompactedChunkIdx].Start.Before(currentChunk.Start) {
compactedChunkStart = stream.Chunks[lastNotCompactedChunkIdx].Start
}
if stream.Chunks[lastNotCompactedChunkIdx].End.After(currentChunk.End) {
compactedChunkEnd = stream.Chunks[lastNotCompactedChunkIdx].End
}
}
compactedStream.Chunks = append(compactedStream.Chunks, ChunkStats{
Start: compactedChunkStart,
End: compactedChunkEnd,
SizeKB: currentChunk.SizeKB + currentCompactedChunkSizeKb,
})
compactedStream.ChunksCount++
compactedStream.SizeKB += currentChunk.SizeKB + currentCompactedChunkSizeKb
//reset
lastNotCompactedChunkIdx = -1
currentCompactedChunkSizeKb = 0
} else {
currentCompactedChunkSizeKb += currentChunk.SizeKB
lastNotCompactedChunkIdx = i
}
}
compactedStreamStats[idx] = compactedStream
return nil
})
if err != nil {
return IndexStats{}, errors.Wrap(err, "error while compacting the streams")
}
compacted := IndexStats{Streams: compactedStreamStats, StreamsCount: uint32(len(compactedStreamStats))}
for _, sst := range compactedStreamStats {
compacted.ChunksCount += sst.ChunksCount
compacted.SizeKB += sst.SizeKB
}
return compacted, nil
}

func computeMetrics(stats *IndexStats) {
chunksCountZeroKB := 0
chunksCount := int(stats.ChunksCount)
stats.ChunkAvgSizeKB = stats.SizeKB / stats.ChunksCount
sizesKB := make([]int, 0, chunksCount)
min := uint32(math.MaxUint32)
max := uint32(0)
for _, stream := range stats.Streams {
for _, chunk := range stream.Chunks {
if chunk.SizeKB < min {
min = chunk.SizeKB
}
if chunk.SizeKB > max {
max = chunk.SizeKB
}
if chunk.SizeKB == 0 {
chunksCountZeroKB++
}
sizesKB = append(sizesKB, int(chunk.SizeKB))
}
}
level.Info(logger).Log("msg", "chunks with ZERO size", "count", chunksCountZeroKB)
sort.Ints(sizesKB)
stats.ChunkMedSizeKB = uint32(sizesKB[len(sizesKB)/2])
stats.MinChunkSizeKB = min
stats.MaxChunkSizeKB = max
}

type ChunkStats struct {
Start time.Time
End time.Time
SizeKB uint32
}

type StreamStats struct {
Stream string
Chunks []ChunkStats
ChunksCount uint32
SizeKB uint32
}

type IndexStats struct {
Streams []StreamStats
StreamsCount uint32
ChunksCount uint32
SizeKB uint32
//computed
ChunkAvgSizeKB uint32
ChunkMedSizeKB uint32
MinChunkSizeKB uint32
MaxChunkSizeKB uint32
}
132 changes: 132 additions & 0 deletions cmd/streams-inspect/streams-inspector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
stream_inspector "github.com/grafana/loki/pkg/stream-inspector"
"math"
"os"
"strings"
"time"

"github.com/grafana/loki/pkg/storage/stores/tsdb/index"

"github.com/prometheus/prometheus/model/labels"

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/storage/stores/tsdb"
)

const (
daySinceUnixEpoc = 19610
indexLocation = "<PATH_TO_TSDB_FILE>"
resultFilePath = "<PATH_TO_THE_FILE_TO_WRITE_THE_RESULTS_JSON>"
)

var logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))

func main() {
ctx := context.Background()

idx, _, err := tsdb.NewTSDBIndexFromFile(indexLocation, tsdb.IndexOpts{})

if err != nil {
level.Error(logger).Log("msg", "can not read index file", "err", err)
return
}
//first 6 hours
start := time.Unix(0, 0).UTC().AddDate(0, 0, daySinceUnixEpoc)
end := start.AddDate(0, 0, 1).Add(-18 * time.Hour)
var streamMatchers []*labels.Matcher
//streamMatchers = []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "job", "hosted-grafana/grafana"), labels.MustNewMatcher(labels.MatchEqual, "cluster", "prod-us-central-0"), labels.MustNewMatcher(labels.MatchEqual, "container", "grafana")}
limit := int32(5000)

leftTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers)
if err != nil {
fmt.Println("err", err)
return
}
//last 6 hours
start = start.Add(18 * time.Hour)
end = start.Add(6 * time.Hour)
rightTrees, err := buildTrees(start, end, limit, idx, ctx, streamMatchers)
if err != nil {
fmt.Println("err", err)
return
}

level.Info(logger).Log("msg", "starting building flamegraph model")
converter := stream_inspector.FlamegraphConverter{Left: leftTrees, Right: rightTrees, Mode: stream_inspector.Diff}
flameBearer := converter.CovertTrees()
level.Info(logger).Log("msg", "completed building flamegraph model")

level.Info(logger).Log("msg", "starting writing json")
content, err := json.Marshal(flameBearer)
if err != nil {
panic(err)
return
}
_, err = os.Stat(resultFilePath)
if err == nil {
level.Info(logger).Log("msg", "results file already exists. deleting previous one.")
err := os.Remove(resultFilePath)
if err != nil {
panic(err)
return
}
}
err = os.WriteFile(resultFilePath, content, 0644)
if err != nil {
panic(err)
return
}
}

func buildTrees(start time.Time, end time.Time, limit int32, idx tsdb.Index, ctx context.Context, streamMatchers []*labels.Matcher) ([]*stream_inspector.Tree, error) {
fromUnix := model.TimeFromUnix(start.Unix())
toUnix := model.TimeFromUnix(end.Unix())
level.Info(logger).Log("msg", "starting extracting volumes")
level.Info(logger).Log("from", fromUnix.Time().UTC(), "to", toUnix.Time().UTC())
accumulator := seriesvolume.NewAccumulator(limit, math.MaxInt32)
err := idx.Volume(ctx, "", fromUnix, toUnix, accumulator, nil, func(meta index.ChunkMeta) bool {
return true
}, nil, seriesvolume.Series, append(streamMatchers, labels.MustNewMatcher(labels.MatchNotEqual, "", "non-existent"))...)
if err != nil {
level.Error(logger).Log("msg", "error while fetching all the streams", "err", err)
return nil, fmt.Errorf("error while fetching all the streams: %w", err)
}
volumes := accumulator.Volumes().GetVolumes()
streams := make([]stream_inspector.StreamWithVolume, 0, len(volumes))
for _, volume := range volumes {
labelsString := strings.Trim(volume.Name, "{}")
pairs := strings.Split(labelsString, ", ")
lbls := make([]string, 0, len(pairs)*2)
for _, pair := range pairs {
lblVal := strings.Split(pair, "=")
lbls = append(lbls, lblVal[0])
lbls = append(lbls, strings.Trim(lblVal[1], "\""))
}
streams = append(streams, stream_inspector.StreamWithVolume{
Labels: labels.FromStrings(lbls...),
Volume: float64(volume.Volume),
})
}

level.Info(logger).Log("msg", "completed extracting volumes")

level.Info(logger).Log("msg", "starting building trees")
inspector := stream_inspector.Inspector{}
trees, err := inspector.BuildTrees(streams, streamMatchers)
if err != nil {
level.Error(logger).Log("msg", "error while building trees", "err", err)
return nil, fmt.Errorf("error while building trees: %w", err)
}
level.Info(logger).Log("msg", "completed building trees")
return trees, nil
}
Loading
Loading