Skip to content

Commit

Permalink
Merge pull request #9 from monzo/query-frontend-query-stats
Browse files Browse the repository at this point in the history
thanos-query-frontend: add --query-frontend.query-stats-enabled
  • Loading branch information
milesbxf authored Oct 24, 2024
2 parents 2f1f83f + caa13c6 commit 0577661
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 75 deletions.
4 changes: 3 additions & 1 deletion cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func registerQueryFrontend(app *extkingpin.App) {

cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan)
cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+
"When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled)

cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+
" and both flags cannot be used at the same time. "+
Expand Down Expand Up @@ -311,7 +313,7 @@ func runQueryFrontend(
return err
}

roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper)
roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled)
if err != nil {
return errors.Wrap(err, "setup downstream roundtripper")
}
Expand Down
16 changes: 12 additions & 4 deletions internal/cortex/frontend/downstream_roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (

// RoundTripper that forwards requests to downstream URL.
type downstreamRoundTripper struct {
downstreamURL *url.URL
transport http.RoundTripper
downstreamURL *url.URL
transport http.RoundTripper
queryStatsEnabled bool
}

func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) {
func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) {
u, err := url.Parse(downstreamURL)
if err != nil {
return nil, err
}

return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil
return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil
}

func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
Expand All @@ -36,6 +37,13 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro
}
}

if d.queryStatsEnabled {
// add &stats query param to get thanos-query to add query statistics to log
q := r.URL.Query()
q.Set("stats", "true")
r.URL.RawQuery = q.Encode()
}

r.URL.Scheme = d.downstreamURL.Scheme
r.URL.Host = d.downstreamURL.Host
r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path)
Expand Down
142 changes: 72 additions & 70 deletions internal/cortex/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,24 @@ package transport
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/util/stats"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"syscall"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"

querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats"
"github.com/thanos-io/thanos/internal/cortex/tenant"
"github.com/thanos-io/thanos/internal/cortex/util"
util_log "github.com/thanos-io/thanos/internal/cortex/util/log"
)
Expand Down Expand Up @@ -56,10 +55,9 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.HistogramVec
querySamplesTotal *prometheus.HistogramVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -71,25 +69,21 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
}

if cfg.QueryStatsEnabled {
h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_seconds_total",
Help: "Total amount of wall clock time spend processing queries.",
h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_seconds",
Help: "Total amount of wall clock time spend processing queries.",
Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 360},
}, []string{"user"})

h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_series_total",
Help: "Number of series fetched to execute a query.",
}, []string{"user"})

h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_chunks_bytes_total",
Help: "Size of all chunks fetched to execute a query in bytes.",
h.querySamplesTotal = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "thanos_query_frontend_query_total_fetched_samples",
Help: "Number of samples touched to execute a query.",
Buckets: []float64{1, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000},
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
h.querySamplesTotal.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
Expand All @@ -98,25 +92,23 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
return h
}

type ResponseDataWithStats struct {
Stats *stats.BuiltinStats `json:"stats"`
}
type ResponseWithStats struct {
Data ResponseDataWithStats `json:"data"`
}

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
stats *querier_stats.Stats
queryString url.Values
)

// Initialise the stats in the context and make sure it's propagated
// down the request chain.
if f.cfg.QueryStatsEnabled {
var ctx context.Context
stats, ctx = querier_stats.ContextWithEmptyStats(r.Context())
r = r.WithContext(ctx)
}

defer func() {
_ = r.Body.Close()
}()

// Buffer the body for later use to track slow queries.
// Buffer the request body for later use to track slow queries.
var buf bytes.Buffer
r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize)
r.Body = io.NopCloser(io.TeeReader(r.Body, &buf))
Expand All @@ -135,17 +127,37 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
hs[h] = vs
}

w.WriteHeader(resp.StatusCode)

var respBuf bytes.Buffer
if f.cfg.QueryStatsEnabled {
writeServiceTimingHeader(queryResponseTime, hs, stats)
// Buffer the response body for query stat tracking later
resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf))
}

w.WriteHeader(resp.StatusCode)
// log copy response body error so that we will know even though success response code returned
bytesCopied, err := io.Copy(w, resp.Body)
if err != nil && !errors.Is(err, syscall.EPIPE) {
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
}

if f.cfg.QueryStatsEnabled {
// Parse the stats field out of the response body
var statsResponse ResponseWithStats
if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil {
if statsResponse.Data.Stats != nil {
queryString = f.parseRequestQueryString(r, buf)
f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Data.Stats)
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil"))
}
} else {
// Don't fail the request if the stats are nil, just log a warning
level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err)
}
}

// Check whether we should parse the query string.
shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan
if shouldReportSlowQuery || f.cfg.QueryStatsEnabled {
Expand All @@ -155,9 +167,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if shouldReportSlowQuery {
f.reportSlowQuery(r, hs, queryString, queryResponseTime)
}
if f.cfg.QueryStatsEnabled {
f.reportQueryStats(r, queryString, queryResponseTime, stats)
}
}

// reportSlowQuery reports slow queries.
Expand Down Expand Up @@ -194,38 +203,45 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header,
level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
}

func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) {
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *stats.BuiltinStats) {
remoteUser, _, _ := r.BasicAuth()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

// Log stats.
logMessage := append([]interface{}{
logMessage := []interface{}{
"msg", "query stats",
"component", "query-frontend",
"method", r.Method,
"path", r.URL.Path,
"remote_user", remoteUser,
"remote_addr", r.RemoteAddr,
"response_time", queryResponseTime,
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
}, formatQueryString(queryString)...)
"query_timings_preparation_time", stats.Timings.QueryPreparationTime,
"query_timings_eval_total_time", stats.Timings.EvalTotalTime,
"query_timings_exec_total_time", stats.Timings.ExecTotalTime,
"query_timings_exec_queue_time", stats.Timings.ExecQueueTime,
"query_timings_inner_eval_time", stats.Timings.InnerEvalTime,
"query_timings_result_sort_time", stats.Timings.ResultSortTime,
}
if stats.Samples != nil {
samples := stats.Samples

logMessage = append(logMessage, []interface{}{
"total_queryable_samples", samples.TotalQueryableSamples,
"peak_samples", samples.PeakSamples,
}...)
}

logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)

// Record metrics.
if f.querySeconds != nil {
f.querySeconds.WithLabelValues(remoteUser).Observe(queryResponseTime.Seconds())
}
if f.querySamplesTotal != nil && stats.Samples != nil {
f.querySamplesTotal.WithLabelValues(remoteUser).Observe(float64(stats.Samples.TotalQueryableSamples))
}
}

func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values {
Expand Down Expand Up @@ -262,17 +278,3 @@ func writeError(w http.ResponseWriter, err error) {
}
server.WriteError(w, err)
}

func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) {
if stats != nil {
parts := make([]string, 0)
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
parts = append(parts, statsValue("response_time", queryResponseTime))
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
}
}

func statsValue(name string, d time.Duration) string {
durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64)
return name + ";dur=" + durationInMs
}

0 comments on commit 0577661

Please sign in to comment.