Skip to content

Commit

Permalink
enhance: [2.5] Use middleware to observe restful v2 in/out rpc stats (m…
Browse files Browse the repository at this point in the history
…ilvus-io#37223) (milvus-io#37240)

Cherry pick from master
pr: milvus-io#37223
Related to milvus-io#36102

Previous PR milvus-io#36107 add grpc inteceptor to observe rpc stats. Using same
strategy, this pr add gin middleware to observer restful v2 rpc stats.

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Oct 29, 2024
1 parent 313c69c commit bf8693d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 29 deletions.
52 changes: 35 additions & 17 deletions internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/crypto"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand Down Expand Up @@ -61,41 +62,49 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
router.POST(CollectionCategory+LoadAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.loadCollection)))))
router.POST(CollectionCategory+ReleaseAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.releaseCollection)))))

router.POST(EntityCategory+QueryAction, timeoutMiddleware(wrapperPost(func() any {
// Query
router.POST(EntityCategory+QueryAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &QueryReqV2{
Limit: 100,
OutputFields: []string{DefaultOutputFields},
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.query)))))
router.POST(EntityCategory+GetAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.query)))), true))
// Get
router.POST(EntityCategory+GetAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &CollectionIDReq{
OutputFields: []string{DefaultOutputFields},
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.get)))))
router.POST(EntityCategory+DeleteAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.get)))), true))
// Delete
router.POST(EntityCategory+DeleteAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &CollectionFilterReq{}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.delete)))))
router.POST(EntityCategory+InsertAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.delete)))), false))
// Insert
router.POST(EntityCategory+InsertAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &CollectionDataReq{}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.insert)))))
router.POST(EntityCategory+UpsertAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.insert)))), false))
// Upsert
router.POST(EntityCategory+UpsertAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &CollectionDataReq{}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.upsert)))))
router.POST(EntityCategory+SearchAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.upsert)))), false))
// Search
router.POST(EntityCategory+SearchAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &SearchReqV2{
Limit: 100,
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.search)))))
router.POST(EntityCategory+AdvancedSearchAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.search)))), true))
// advanced_search, backward compatible uri
router.POST(EntityCategory+AdvancedSearchAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &HybridSearchReq{
Limit: 100,
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))))
router.POST(EntityCategory+HybridSearchAction, timeoutMiddleware(wrapperPost(func() any {
}, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))), true))
// HybridSearch
router.POST(EntityCategory+HybridSearchAction, restfulSizeMiddleware(timeoutMiddleware(wrapperPost(func() any {
return &HybridSearchReq{
Limit: 100,
}
}, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))))
}, wrapperTraceLog(h.wrapperCheckDatabase(h.advancedSearch)))), true))

router.POST(PartitionCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listPartitions)))))
router.POST(PartitionCategory+HasAction, timeoutMiddleware(wrapperPost(func() any { return &PartitionReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.hasPartitions)))))
Expand Down Expand Up @@ -183,7 +192,7 @@ func wrapperPost(newReq newReqFunc, v2 handlerFuncV2) gin.HandlerFunc {
}
}
username, _ := c.Get(ContextUsername)
ctx, span := otel.Tracer(typeutil.ProxyRole).Start(context.Background(), c.Request.URL.Path)
ctx, span := otel.Tracer(typeutil.ProxyRole).Start(c, c.Request.URL.Path)
defer span.End()
ctx = proxy.NewContextWithMetadata(ctx, username.(string), dbName)
traceID := span.SpanContext().TraceID().String()
Expand All @@ -195,6 +204,15 @@ func wrapperPost(newReq newReqFunc, v2 handlerFuncV2) gin.HandlerFunc {
}
}

// restfulSizeMiddleware is the middleware fetchs metrics stats from gin struct.
func restfulSizeMiddleware(handler gin.HandlerFunc, observeOutbound bool) gin.HandlerFunc {
return func(ctx *gin.Context) {
h := metrics.WrapRestfulContext(ctx, ctx.Request.ContentLength)
handler(ctx)
metrics.RecordRestfulMetrics(h, int64(ctx.Writer.Size()), observeOutbound)
}
}

func wrapperTraceLog(v2 handlerFuncV2) handlerFuncV2 {
return func(ctx context.Context, c *gin.Context, req any, dbName string) (interface{}, error) {
switch proxy.Params.CommonCfg.TraceLogMode.GetAsInt() {
Expand Down
24 changes: 12 additions & 12 deletions pkg/metrics/grpc_stats_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,49 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

// milvusGrpcKey is context key type.
type milvusGrpcKey struct{}
// milvusStatsKey is context key type.
type milvusStatsKey struct{}

// GrpcStats stores the meta and payload size info
// RPCStats stores the meta and payload size info
// it should be attached to context so that request sizing could be avoided
type GrpcStats struct {
type RPCStats struct {
fullMethodName string
collectionName string
inboundPayloadSize int
inboundLabel string
nodeID int64
}

func (s *GrpcStats) SetCollectionName(collName string) *GrpcStats {
func (s *RPCStats) SetCollectionName(collName string) *RPCStats {
if s == nil {
return s
}
s.collectionName = collName
return s
}

func (s *GrpcStats) SetInboundLabel(label string) *GrpcStats {
func (s *RPCStats) SetInboundLabel(label string) *RPCStats {
if s == nil {
return s
}
s.inboundLabel = label
return s
}

func (s *GrpcStats) SetNodeID(nodeID int64) *GrpcStats {
func (s *RPCStats) SetNodeID(nodeID int64) *RPCStats {
if s == nil {
return s
}
s.nodeID = nodeID
return s
}

func attachStats(ctx context.Context, stats *GrpcStats) context.Context {
return context.WithValue(ctx, milvusGrpcKey{}, stats)
func attachStats(ctx context.Context, stats *RPCStats) context.Context {
return context.WithValue(ctx, milvusStatsKey{}, stats)
}

func GetStats(ctx context.Context) *GrpcStats {
stats, ok := ctx.Value(milvusGrpcKey{}).(*GrpcStats)
func GetStats(ctx context.Context) *RPCStats {
stats, ok := ctx.Value(milvusStatsKey{}).(*RPCStats)
if !ok {
return nil
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (h *grpcSizeStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInf
return ctx
}
// attach stats
return attachStats(ctx, &GrpcStats{fullMethodName: info.FullMethodName})
return attachStats(ctx, &RPCStats{fullMethodName: info.FullMethodName})
}

// HandleRPC implements per-RPC stats instrumentation.
Expand Down
45 changes: 45 additions & 0 deletions pkg/metrics/restful_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"context"
"strconv"
)

func WrapRestfulContext(ctx context.Context, inputLength int64) context.Context {
return context.WithValue(ctx, milvusStatsKey{}, &RPCStats{
inboundPayloadSize: int(inputLength),
})
}

func RecordRestfulMetrics(ctx context.Context, outputLength int64, observeOutbound bool) {
if mstats := GetStats(ctx); mstats != nil {
// all info set
// set metrics with inbound size and related meta
nodeIDValue := strconv.FormatInt(mstats.nodeID, 10)
if mstats.inboundPayloadSize > 0 {
ProxyReceiveBytes.WithLabelValues(
nodeIDValue,
mstats.inboundLabel, mstats.collectionName).Add(float64(mstats.inboundPayloadSize))
}
// set outbound payload size metrics
if outputLength > 0 && observeOutbound {
ProxyReadReqSendBytes.WithLabelValues(nodeIDValue).Add(float64(outputLength))
}
}
}

0 comments on commit bf8693d

Please sign in to comment.