Skip to content

Commit

Permalink
Add tracing for incoming requests (#101)
Browse files Browse the repository at this point in the history
* Add tracing for incoming requests

* Add tracing for Kubernetes API calls.

* Add trace span for reconciliation.

Note that Kubernetes API calls made here are not traced because they
are not made using the context with the span attached.

* Trace requests made while preparing a StatefulSet for downscaling

* Attach tags to created spans

* Fix issue where calls to prepare-downscale endpoints aren't captured correctly

* Use correct casing for 'POST'

* Add tracing for Kubernetes API calls and add spans for main operations in prepare downscale webhook

* Add missing operation to test environment role

* Nest all "prepare pod for shutdown" spans under a single parent span.

* Add instrumentation to no downscale webhook as well.

* Add changelog entry.
  • Loading branch information
charleskorn authored Jan 10, 2024
1 parent a522cc4 commit e026294
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [FEATURE] Coordinate downscaling between zones with a ConfigMap instead of annotation, optionally, via the new zoneTracker for the prepare-downscale admission webhook. #107
* [ENHANCEMENT] Expose pprof endpoint for profiling. #109
* [ENHANCEMENT] Change Docker build image to `golang:1.21-bookworm` and update base image to `alpine:3.19`. #97
* [ENHANCEMENT] Add basic tracing support. #101

## v0.10.1

Expand Down
3 changes: 3 additions & 0 deletions cmd/rollout-operator/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func newInstrumentedRouter(metrics *metrics) (*mux.Router, http.Handler) {
router := mux.NewRouter()

httpMiddleware := []middleware.Interface{
middleware.Tracer{
RouteMatcher: router,
},
middleware.Instrument{
RouteMatcher: router,
Duration: metrics.RequestDuration,
Expand Down
19 changes: 18 additions & 1 deletion cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tracing"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -27,8 +29,8 @@ import (

"github.com/grafana/rollout-operator/pkg/admission"
"github.com/grafana/rollout-operator/pkg/controller"
"github.com/grafana/rollout-operator/pkg/instrumentation"
"github.com/grafana/rollout-operator/pkg/tlscert"

// Required to get the GCP auth provider working.
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
Expand Down Expand Up @@ -118,6 +120,17 @@ func main() {
ready := atomic.NewBool(false)
restart := make(chan string)

name := os.Getenv("JAEGER_SERVICE_NAME")
if name == "" {
name = "rollout-operator"
}

if trace, err := tracing.NewFromEnv(name); err != nil {
level.Error(logger).Log("msg", "Failed to setup tracing", "err", err.Error())
} else {
defer trace.Close()
}

// Expose HTTP endpoints.
srv := newServer(cfg.serverPort, logger, metrics)
srv.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
Expand All @@ -129,6 +142,10 @@ func main() {
kubeConfig, err := buildKubeConfig(cfg.kubeAPIURL, cfg.kubeConfigFile)
check(errors.Wrap(err, "failed to build Kubernetes client config"))

kubeConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return instrumentation.NewKubernetesAPIClientTracer(&nethttp.Transport{RoundTripper: rt})
})

kubeClient, err := kubernetes.NewForConfig(kubeConfig)
check(errors.Wrap(err, "failed to create Kubernetes client"))

Expand Down
1 change: 1 addition & 0 deletions development/rollout-operator-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ rules:
- list
- get
- watch
- patch
- apiGroups:
- apps
resources:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ require (
github.com/grafana/dskit v0.0.0-20231213223053-84f5540a28dd
github.com/hashicorp/go-multierror v1.1.1
github.com/k3d-io/k3d/v5 v5.6.0
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.45.0
Expand Down Expand Up @@ -89,8 +91,6 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/opentracing-contrib/go-stdlib v1.0.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
Expand Down
19 changes: 19 additions & 0 deletions pkg/admission/no_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/spanlogger"
"github.com/opentracing/opentracing-go"
v1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
Expand All @@ -24,18 +26,27 @@ const (

func NoDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset) *v1.AdmissionResponse {
logger = log.With(logger, "name", ar.Request.Name, "resource", ar.Request.Resource.Resource, "namespace", ar.Request.Namespace)
spanLogger, ctx := spanlogger.New(ctx, logger, "No downscale", tenantResolver)
defer spanLogger.Span.Finish()
logger = spanLogger

spanLogger.SetTag("object.name", ar.Request.Name)
spanLogger.SetTag("object.resource", ar.Request.Resource.Resource)
spanLogger.SetTag("object.namespace", ar.Request.Namespace)

oldObj, oldGVK, err := codecs.UniversalDeserializer().Decode(ar.Request.OldObject.Raw, nil, nil)
if err != nil {
return allowErr(logger, "can't decode old object, allowing the change", err)
}
logger = log.With(logger, "request_gvk", oldGVK)
spanLogger.SetTag("request.gvk", oldGVK)

oldReplicas, err := replicas(oldObj, oldGVK)
if err != nil {
return allowErr(logger, "can't get old replicas, allowing the change", err)
}
logger = log.With(logger, "old_replicas", int32PtrStr(oldReplicas))
spanLogger.SetTag("old_replicas", int32PtrStr(oldReplicas))

newObj, newGVK, err := codecs.UniversalDeserializer().Decode(ar.Request.Object.Raw, nil, nil)
if err != nil {
Expand All @@ -47,6 +58,7 @@ func NoDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview,
return allowErr(logger, "can't get new replicas, allowing the change", err)
}
logger = log.With(logger, "new_replicas", int32PtrStr(newReplicas))
spanLogger.SetTag("new_replicas", int32PtrStr(newReplicas))

// Both replicas are nil, nothing to warn about.
if oldReplicas == nil && newReplicas == nil {
Expand Down Expand Up @@ -140,6 +152,13 @@ func allowErr(logger log.Logger, msg string, err error) *v1.AdmissionResponse {
}

func getResourceLabels(ctx context.Context, ar v1.AdmissionReview, api kubernetes.Interface) (map[string]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Get resource labels")
defer span.Finish()

span.SetTag("namespace", ar.Request.Namespace)
span.SetTag("name", ar.Request.Name)
span.SetTag("resource", ar.Request.Resource.Resource)

switch ar.Request.Resource.Resource {
case "statefulsets":
obj, err := api.AppsV1().StatefulSets(ar.Request.Namespace).Get(ctx, ar.Request.Name, metav1.GetOptions{})
Expand Down
102 changes: 89 additions & 13 deletions pkg/admission/prep_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/spanlogger"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -33,7 +36,8 @@ const (

func PrepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset, useZoneTracker bool, zoneTrackerConfigMapName string) *v1.AdmissionResponse {
client := &http.Client{
Timeout: 5 * time.Second,
Timeout: 5 * time.Second,
Transport: &nethttp.Transport{RoundTripper: http.DefaultTransport},
}

if useZoneTracker {
Expand All @@ -45,11 +49,19 @@ func PrepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
}

type httpClient interface {
Post(url, contentType string, body io.Reader) (resp *http.Response, err error)
Do(req *http.Request) (*http.Response, error)
}

func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api kubernetes.Interface, client httpClient) *v1.AdmissionResponse {
logger = log.With(logger, "name", ar.Request.Name, "resource", ar.Request.Resource.Resource, "namespace", ar.Request.Namespace)
spanLogger, ctx := spanlogger.New(ctx, logger, "Prepare downscale", tenantResolver)
defer spanLogger.Span.Finish()
logger = spanLogger

spanLogger.SetTag("object.name", ar.Request.Name)
spanLogger.SetTag("object.resource", ar.Request.Resource.Resource)
spanLogger.SetTag("object.namespace", ar.Request.Namespace)
spanLogger.SetTag("request.dry_run", *ar.Request.DryRun)

if *ar.Request.DryRun {
return &v1.AdmissionResponse{Allowed: true}
Expand All @@ -60,12 +72,15 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
return allowErr(logger, "can't decode old object, allowing the change", err)
}
logger = log.With(logger, "request_gvk", oldInfo.gvk, "old_replicas", int32PtrStr(oldInfo.replicas))
spanLogger.SetTag("request.gvk", oldInfo.gvk)
spanLogger.SetTag("old_replicas", int32PtrStr(oldInfo.replicas))

newInfo, err := decodeAndReplicas(ar.Request.Object.Raw)
if err != nil {
return allowErr(logger, "can't decode new object, allowing the change", err)
}
logger = log.With(logger, "new_replicas", int32PtrStr(newInfo.replicas))
spanLogger.SetTag("new_replicas", int32PtrStr(newInfo.replicas))

// Continue if it's a downscale
response := checkReplicasChange(logger, oldInfo, newInfo)
Expand Down Expand Up @@ -142,11 +157,10 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
}

// Since it's a downscale, check if the resource has the label that indicates it needs to be prepared to be downscaled.
// Create a slice of endpoint addresses for pods to send HTTP post requests to and to fail if any don't return 200
// Create a slice of endpoint addresses for pods to send HTTP POST requests to and to fail if any don't return 200
eps := createEndpoints(ar, oldInfo, newInfo, port, path)

err = sendPrepareShutdownRequests(ctx, logger, client, eps)
if err != nil {
if err := sendPrepareShutdownRequests(ctx, logger, client, eps); err != nil {
// Down-scale operation is disallowed because a pod failed to prepare for shutdown and cannot be deleted
level.Error(logger).Log("msg", "downscale not allowed due to error", "err", err)
return deny(
Expand All @@ -155,8 +169,7 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
)
}

err = addDownscaledAnnotationToStatefulSet(ctx, api, ar.Request.Namespace, ar.Request.Name)
if err != nil {
if err := addDownscaledAnnotationToStatefulSet(ctx, api, ar.Request.Namespace, ar.Request.Name); err != nil {
level.Error(logger).Log("msg", "downscale not allowed due to error while adding annotation", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because adding an annotation to the statefulset failed.",
Expand Down Expand Up @@ -185,6 +198,12 @@ func deny(msg string, args ...any) *v1.AdmissionResponse {
}

func getResourceAnnotations(ctx context.Context, ar v1.AdmissionReview, api kubernetes.Interface) (map[string]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Get resource annotations")
defer span.Finish()

span.SetTag("namespace", ar.Request.Namespace)
span.SetTag("name", ar.Request.Name)

switch ar.Request.Resource.Resource {
case "statefulsets":
obj, err := api.AppsV1().StatefulSets(ar.Request.Namespace).Get(ctx, ar.Request.Name, metav1.GetOptions{})
Expand All @@ -197,6 +216,12 @@ func getResourceAnnotations(ctx context.Context, ar v1.AdmissionReview, api kube
}

func addDownscaledAnnotationToStatefulSet(ctx context.Context, api kubernetes.Interface, namespace, stsName string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Add downscaled annotation to StatefulSet")
defer span.Finish()

span.SetTag("namespace", namespace)
span.SetTag("name", stsName)

client := api.AppsV1().StatefulSets(namespace)
patch := fmt.Sprintf(`{"metadata":{"annotations":{"%v":"%v"}}}`, config.LastDownscaleAnnotationKey, time.Now().UTC().Format(time.RFC3339))
_, err := client.Patch(ctx, stsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
Expand Down Expand Up @@ -262,6 +287,11 @@ func findDownscalesDoneMinTimeAgo(stsList *appsv1.StatefulSetList, excludeStsNam
//
// The StatefulSet whose name matches the input excludeStsName is not checked.
func findStatefulSetWithNonUpdatedReplicas(ctx context.Context, api kubernetes.Interface, namespace string, stsList *appsv1.StatefulSetList, excludeStsName string) (*statefulSetDownscale, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Find StatefulSet with non-updated replicas")
defer span.Finish()

span.SetTag("namespace", namespace)

for _, sts := range stsList.Items {
if sts.Name == excludeStsName {
continue
Expand All @@ -284,6 +314,12 @@ func findStatefulSetWithNonUpdatedReplicas(ctx context.Context, api kubernetes.I

// countRunningAndReadyPods counts running and ready pods for a StatefulSet.
func countRunningAndReadyPods(ctx context.Context, api kubernetes.Interface, namespace string, sts *appsv1.StatefulSet) (int, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Count running and ready pods for StatefulSet")
defer span.Finish()

span.SetTag("namespace", namespace)
span.SetTag("name", sts.Name)

pods, err := findPodsForStatefulSet(ctx, api, namespace, sts)
if err != nil {
return 0, err
Expand All @@ -309,6 +345,12 @@ func findPodsForStatefulSet(ctx context.Context, api kubernetes.Interface, names
}

func findStatefulSetsForRolloutGroup(ctx context.Context, api kubernetes.Interface, namespace, rolloutGroup string) (*appsv1.StatefulSetList, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Find StatefulSets for rollout group")
defer span.Finish()

span.SetTag("namespace", namespace)
span.SetTag("rollout_group", rolloutGroup)

groupReq, err := labels.NewRequirement(config.RolloutGroupLabelKey, selection.Equals, []string{rolloutGroup})
if err != nil {
return nil, err
Expand Down Expand Up @@ -423,22 +465,44 @@ func createEndpoints(ar v1.AdmissionReview, oldInfo, newInfo *objectInfo, port,
}

func sendPrepareShutdownRequests(ctx context.Context, logger log.Logger, client httpClient, eps []endpoint) error {
if len(eps) == 0 {
return nil
}

span, ctx := opentracing.StartSpanFromContext(ctx, "Prepare pods for shutdown")
defer span.Finish()

g, _ := errgroup.WithContext(ctx)
for _, ep := range eps {
ep := ep // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
logger := log.With(logger, "url", ep.url, "index", ep.index)
logger, ctx := spanlogger.New(ctx, log.With(logger, "url", ep.url, "index", ep.index), "Prepare pod for shutdown", tenantResolver)
defer logger.Span.Finish()

logger.SetTag("url", ep.url)
logger.SetTag("index", ep.index)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+ep.url, nil)
if err != nil {
level.Error(logger).Log("msg", "error creating HTTP POST request", "err", err)
}

req.Header.Set("Content-Type", "application/json")
req, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), req)
defer ht.Finish()

resp, err := client.Post("http://"+ep.url, "application/json", nil)
resp, err := client.Do(req)
if err != nil {
level.Error(logger).Log("msg", "error sending HTTP post request", "err", err)
level.Error(logger).Log("msg", "error sending HTTP POST request", "err", err)
return err
}

defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := errors.New("HTTP post request returned non-2xx status code")
err := errors.New("HTTP POST request returned non-2xx status code")
body, readError := io.ReadAll(resp.Body)
defer resp.Body.Close()
level.Error(logger).Log("msg", "error received from shutdown endpoint", "err", err, "status", resp.StatusCode, "response_body", body)
level.Error(logger).Log("msg", "error received from shutdown endpoint", "err", err, "status", resp.StatusCode, "response_body", string(body))
return errors.Join(err, readError)
}
level.Debug(logger).Log("msg", "pod prepared for shutdown")
Expand All @@ -447,3 +511,15 @@ func sendPrepareShutdownRequests(ctx context.Context, logger log.Logger, client
}
return g.Wait()
}

var tenantResolver spanlogger.TenantResolver = noTenantResolver{}

type noTenantResolver struct{}

func (n noTenantResolver) TenantID(ctx context.Context) (string, error) {
return "", nil
}

func (n noTenantResolver) TenantIDs(ctx context.Context) ([]string, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/admission/prep_downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type fakeHttpClient struct {
statusCode int
}

func (f *fakeHttpClient) Post(url, contentType string, body io.Reader) (resp *http.Response, err error) {
func (f *fakeHttpClient) Do(req *http.Request) (resp *http.Response, err error) {
return &http.Response{
StatusCode: f.statusCode,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/go-multierror"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -189,6 +190,9 @@ func (c *RolloutController) enqueueReconcile() {
}

func (c *RolloutController) reconcile(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile")
defer span.Finish()

level.Info(c.logger).Log("msg", "reconcile started")

sets, err := c.listStatefulSetsWithRolloutGroup()
Expand Down
Loading

0 comments on commit e026294

Please sign in to comment.