Skip to content

Commit

Permalink
Revert "Add tracing for incoming requests (#101)" (#113)
Browse files Browse the repository at this point in the history
This reverts commit e026294.
  • Loading branch information
andyasp authored Jan 10, 2024
1 parent e026294 commit c89f316
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 330 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
* [FEATURE] Coordinate downscaling between zones with a ConfigMap instead of annotation, optionally, via the new zoneTracker for the prepare-downscale admission webhook. #107
* [ENHANCEMENT] Expose pprof endpoint for profiling. #109
* [ENHANCEMENT] Change Docker build image to `golang:1.21-bookworm` and update base image to `alpine:3.19`. #97
* [ENHANCEMENT] Add basic tracing support. #101

## v0.10.1

Expand Down
3 changes: 0 additions & 3 deletions cmd/rollout-operator/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ func newInstrumentedRouter(metrics *metrics) (*mux.Router, http.Handler) {
router := mux.NewRouter()

httpMiddleware := []middleware.Interface{
middleware.Tracer{
RouteMatcher: router,
},
middleware.Instrument{
RouteMatcher: router,
Duration: metrics.RequestDuration,
Expand Down
19 changes: 1 addition & 18 deletions cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tracing"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -29,8 +27,8 @@ import (

"github.com/grafana/rollout-operator/pkg/admission"
"github.com/grafana/rollout-operator/pkg/controller"
"github.com/grafana/rollout-operator/pkg/instrumentation"
"github.com/grafana/rollout-operator/pkg/tlscert"

// Required to get the GCP auth provider working.
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)
Expand Down Expand Up @@ -120,17 +118,6 @@ func main() {
ready := atomic.NewBool(false)
restart := make(chan string)

name := os.Getenv("JAEGER_SERVICE_NAME")
if name == "" {
name = "rollout-operator"
}

if trace, err := tracing.NewFromEnv(name); err != nil {
level.Error(logger).Log("msg", "Failed to setup tracing", "err", err.Error())
} else {
defer trace.Close()
}

// Expose HTTP endpoints.
srv := newServer(cfg.serverPort, logger, metrics)
srv.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
Expand All @@ -142,10 +129,6 @@ func main() {
kubeConfig, err := buildKubeConfig(cfg.kubeAPIURL, cfg.kubeConfigFile)
check(errors.Wrap(err, "failed to build Kubernetes client config"))

kubeConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return instrumentation.NewKubernetesAPIClientTracer(&nethttp.Transport{RoundTripper: rt})
})

kubeClient, err := kubernetes.NewForConfig(kubeConfig)
check(errors.Wrap(err, "failed to create Kubernetes client"))

Expand Down
1 change: 0 additions & 1 deletion development/rollout-operator-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ rules:
- list
- get
- watch
- patch
- apiGroups:
- apps
resources:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ require (
github.com/grafana/dskit v0.0.0-20231213223053-84f5540a28dd
github.com/hashicorp/go-multierror v1.1.1
github.com/k3d-io/k3d/v5 v5.6.0
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.45.0
Expand Down Expand Up @@ -91,6 +89,8 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc3 // indirect
github.com/opencontainers/runc v1.1.7 // indirect
github.com/opentracing-contrib/go-stdlib v1.0.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 // indirect
Expand Down
19 changes: 0 additions & 19 deletions pkg/admission/no_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/spanlogger"
"github.com/opentracing/opentracing-go"
v1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
Expand All @@ -26,27 +24,18 @@ const (

func NoDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset) *v1.AdmissionResponse {
logger = log.With(logger, "name", ar.Request.Name, "resource", ar.Request.Resource.Resource, "namespace", ar.Request.Namespace)
spanLogger, ctx := spanlogger.New(ctx, logger, "No downscale", tenantResolver)
defer spanLogger.Span.Finish()
logger = spanLogger

spanLogger.SetTag("object.name", ar.Request.Name)
spanLogger.SetTag("object.resource", ar.Request.Resource.Resource)
spanLogger.SetTag("object.namespace", ar.Request.Namespace)

oldObj, oldGVK, err := codecs.UniversalDeserializer().Decode(ar.Request.OldObject.Raw, nil, nil)
if err != nil {
return allowErr(logger, "can't decode old object, allowing the change", err)
}
logger = log.With(logger, "request_gvk", oldGVK)
spanLogger.SetTag("request.gvk", oldGVK)

oldReplicas, err := replicas(oldObj, oldGVK)
if err != nil {
return allowErr(logger, "can't get old replicas, allowing the change", err)
}
logger = log.With(logger, "old_replicas", int32PtrStr(oldReplicas))
spanLogger.SetTag("old_replicas", int32PtrStr(oldReplicas))

newObj, newGVK, err := codecs.UniversalDeserializer().Decode(ar.Request.Object.Raw, nil, nil)
if err != nil {
Expand All @@ -58,7 +47,6 @@ func NoDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview,
return allowErr(logger, "can't get new replicas, allowing the change", err)
}
logger = log.With(logger, "new_replicas", int32PtrStr(newReplicas))
spanLogger.SetTag("new_replicas", int32PtrStr(newReplicas))

// Both replicas are nil, nothing to warn about.
if oldReplicas == nil && newReplicas == nil {
Expand Down Expand Up @@ -152,13 +140,6 @@ func allowErr(logger log.Logger, msg string, err error) *v1.AdmissionResponse {
}

func getResourceLabels(ctx context.Context, ar v1.AdmissionReview, api kubernetes.Interface) (map[string]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Get resource labels")
defer span.Finish()

span.SetTag("namespace", ar.Request.Namespace)
span.SetTag("name", ar.Request.Name)
span.SetTag("resource", ar.Request.Resource.Resource)

switch ar.Request.Resource.Resource {
case "statefulsets":
obj, err := api.AppsV1().StatefulSets(ar.Request.Namespace).Get(ctx, ar.Request.Name, metav1.GetOptions{})
Expand Down
102 changes: 13 additions & 89 deletions pkg/admission/prep_downscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/spanlogger"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/admission/v1"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -36,8 +33,7 @@ const (

func PrepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api *kubernetes.Clientset, useZoneTracker bool, zoneTrackerConfigMapName string) *v1.AdmissionResponse {
client := &http.Client{
Timeout: 5 * time.Second,
Transport: &nethttp.Transport{RoundTripper: http.DefaultTransport},
Timeout: 5 * time.Second,
}

if useZoneTracker {
Expand All @@ -49,19 +45,11 @@ func PrepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
Post(url, contentType string, body io.Reader) (resp *http.Response, err error)
}

func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionReview, api kubernetes.Interface, client httpClient) *v1.AdmissionResponse {
logger = log.With(logger, "name", ar.Request.Name, "resource", ar.Request.Resource.Resource, "namespace", ar.Request.Namespace)
spanLogger, ctx := spanlogger.New(ctx, logger, "Prepare downscale", tenantResolver)
defer spanLogger.Span.Finish()
logger = spanLogger

spanLogger.SetTag("object.name", ar.Request.Name)
spanLogger.SetTag("object.resource", ar.Request.Resource.Resource)
spanLogger.SetTag("object.namespace", ar.Request.Namespace)
spanLogger.SetTag("request.dry_run", *ar.Request.DryRun)

if *ar.Request.DryRun {
return &v1.AdmissionResponse{Allowed: true}
Expand All @@ -72,15 +60,12 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
return allowErr(logger, "can't decode old object, allowing the change", err)
}
logger = log.With(logger, "request_gvk", oldInfo.gvk, "old_replicas", int32PtrStr(oldInfo.replicas))
spanLogger.SetTag("request.gvk", oldInfo.gvk)
spanLogger.SetTag("old_replicas", int32PtrStr(oldInfo.replicas))

newInfo, err := decodeAndReplicas(ar.Request.Object.Raw)
if err != nil {
return allowErr(logger, "can't decode new object, allowing the change", err)
}
logger = log.With(logger, "new_replicas", int32PtrStr(newInfo.replicas))
spanLogger.SetTag("new_replicas", int32PtrStr(newInfo.replicas))

// Continue if it's a downscale
response := checkReplicasChange(logger, oldInfo, newInfo)
Expand Down Expand Up @@ -157,10 +142,11 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
}

// Since it's a downscale, check if the resource has the label that indicates it needs to be prepared to be downscaled.
// Create a slice of endpoint addresses for pods to send HTTP POST requests to and to fail if any don't return 200
// Create a slice of endpoint addresses for pods to send HTTP post requests to and to fail if any don't return 200
eps := createEndpoints(ar, oldInfo, newInfo, port, path)

if err := sendPrepareShutdownRequests(ctx, logger, client, eps); err != nil {
err = sendPrepareShutdownRequests(ctx, logger, client, eps)
if err != nil {
// Down-scale operation is disallowed because a pod failed to prepare for shutdown and cannot be deleted
level.Error(logger).Log("msg", "downscale not allowed due to error", "err", err)
return deny(
Expand All @@ -169,7 +155,8 @@ func prepareDownscale(ctx context.Context, logger log.Logger, ar v1.AdmissionRev
)
}

if err := addDownscaledAnnotationToStatefulSet(ctx, api, ar.Request.Namespace, ar.Request.Name); err != nil {
err = addDownscaledAnnotationToStatefulSet(ctx, api, ar.Request.Namespace, ar.Request.Name)
if err != nil {
level.Error(logger).Log("msg", "downscale not allowed due to error while adding annotation", "err", err)
return deny(
"downscale of %s/%s in %s from %d to %d replicas is not allowed because adding an annotation to the statefulset failed.",
Expand Down Expand Up @@ -198,12 +185,6 @@ func deny(msg string, args ...any) *v1.AdmissionResponse {
}

func getResourceAnnotations(ctx context.Context, ar v1.AdmissionReview, api kubernetes.Interface) (map[string]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Get resource annotations")
defer span.Finish()

span.SetTag("namespace", ar.Request.Namespace)
span.SetTag("name", ar.Request.Name)

switch ar.Request.Resource.Resource {
case "statefulsets":
obj, err := api.AppsV1().StatefulSets(ar.Request.Namespace).Get(ctx, ar.Request.Name, metav1.GetOptions{})
Expand All @@ -216,12 +197,6 @@ func getResourceAnnotations(ctx context.Context, ar v1.AdmissionReview, api kube
}

func addDownscaledAnnotationToStatefulSet(ctx context.Context, api kubernetes.Interface, namespace, stsName string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "Add downscaled annotation to StatefulSet")
defer span.Finish()

span.SetTag("namespace", namespace)
span.SetTag("name", stsName)

client := api.AppsV1().StatefulSets(namespace)
patch := fmt.Sprintf(`{"metadata":{"annotations":{"%v":"%v"}}}`, config.LastDownscaleAnnotationKey, time.Now().UTC().Format(time.RFC3339))
_, err := client.Patch(ctx, stsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
Expand Down Expand Up @@ -287,11 +262,6 @@ func findDownscalesDoneMinTimeAgo(stsList *appsv1.StatefulSetList, excludeStsNam
//
// The StatefulSet whose name matches the input excludeStsName is not checked.
func findStatefulSetWithNonUpdatedReplicas(ctx context.Context, api kubernetes.Interface, namespace string, stsList *appsv1.StatefulSetList, excludeStsName string) (*statefulSetDownscale, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Find StatefulSet with non-updated replicas")
defer span.Finish()

span.SetTag("namespace", namespace)

for _, sts := range stsList.Items {
if sts.Name == excludeStsName {
continue
Expand All @@ -314,12 +284,6 @@ func findStatefulSetWithNonUpdatedReplicas(ctx context.Context, api kubernetes.I

// countRunningAndReadyPods counts running and ready pods for a StatefulSet.
func countRunningAndReadyPods(ctx context.Context, api kubernetes.Interface, namespace string, sts *appsv1.StatefulSet) (int, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Count running and ready pods for StatefulSet")
defer span.Finish()

span.SetTag("namespace", namespace)
span.SetTag("name", sts.Name)

pods, err := findPodsForStatefulSet(ctx, api, namespace, sts)
if err != nil {
return 0, err
Expand All @@ -345,12 +309,6 @@ func findPodsForStatefulSet(ctx context.Context, api kubernetes.Interface, names
}

func findStatefulSetsForRolloutGroup(ctx context.Context, api kubernetes.Interface, namespace, rolloutGroup string) (*appsv1.StatefulSetList, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Find StatefulSets for rollout group")
defer span.Finish()

span.SetTag("namespace", namespace)
span.SetTag("rollout_group", rolloutGroup)

groupReq, err := labels.NewRequirement(config.RolloutGroupLabelKey, selection.Equals, []string{rolloutGroup})
if err != nil {
return nil, err
Expand Down Expand Up @@ -465,44 +423,22 @@ func createEndpoints(ar v1.AdmissionReview, oldInfo, newInfo *objectInfo, port,
}

func sendPrepareShutdownRequests(ctx context.Context, logger log.Logger, client httpClient, eps []endpoint) error {
if len(eps) == 0 {
return nil
}

span, ctx := opentracing.StartSpanFromContext(ctx, "Prepare pods for shutdown")
defer span.Finish()

g, _ := errgroup.WithContext(ctx)
for _, ep := range eps {
ep := ep // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
logger, ctx := spanlogger.New(ctx, log.With(logger, "url", ep.url, "index", ep.index), "Prepare pod for shutdown", tenantResolver)
defer logger.Span.Finish()

logger.SetTag("url", ep.url)
logger.SetTag("index", ep.index)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+ep.url, nil)
if err != nil {
level.Error(logger).Log("msg", "error creating HTTP POST request", "err", err)
}

req.Header.Set("Content-Type", "application/json")
req, ht := nethttp.TraceRequest(opentracing.GlobalTracer(), req)
defer ht.Finish()
logger := log.With(logger, "url", ep.url, "index", ep.index)

resp, err := client.Do(req)
resp, err := client.Post("http://"+ep.url, "application/json", nil)
if err != nil {
level.Error(logger).Log("msg", "error sending HTTP POST request", "err", err)
level.Error(logger).Log("msg", "error sending HTTP post request", "err", err)
return err
}

defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err := errors.New("HTTP POST request returned non-2xx status code")
err := errors.New("HTTP post request returned non-2xx status code")
body, readError := io.ReadAll(resp.Body)
level.Error(logger).Log("msg", "error received from shutdown endpoint", "err", err, "status", resp.StatusCode, "response_body", string(body))
defer resp.Body.Close()
level.Error(logger).Log("msg", "error received from shutdown endpoint", "err", err, "status", resp.StatusCode, "response_body", body)
return errors.Join(err, readError)
}
level.Debug(logger).Log("msg", "pod prepared for shutdown")
Expand All @@ -511,15 +447,3 @@ func sendPrepareShutdownRequests(ctx context.Context, logger log.Logger, client
}
return g.Wait()
}

var tenantResolver spanlogger.TenantResolver = noTenantResolver{}

type noTenantResolver struct{}

func (n noTenantResolver) TenantID(ctx context.Context) (string, error) {
return "", nil
}

func (n noTenantResolver) TenantIDs(ctx context.Context) ([]string, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/admission/prep_downscale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type fakeHttpClient struct {
statusCode int
}

func (f *fakeHttpClient) Do(req *http.Request) (resp *http.Response, err error) {
func (f *fakeHttpClient) Post(url, contentType string, body io.Reader) (resp *http.Response, err error) {
return &http.Response{
StatusCode: f.statusCode,
Body: io.NopCloser(bytes.NewBuffer([]byte(""))),
Expand Down
4 changes: 0 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/hashicorp/go-multierror"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -190,9 +189,6 @@ func (c *RolloutController) enqueueReconcile() {
}

func (c *RolloutController) reconcile(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile")
defer span.Finish()

level.Info(c.logger).Log("msg", "reconcile started")

sets, err := c.listStatefulSetsWithRolloutGroup()
Expand Down
Loading

0 comments on commit c89f316

Please sign in to comment.