diff --git a/go.mod b/go.mod index 7fdcc65bfa..6d8ee2dbeb 100644 --- a/go.mod +++ b/go.mod @@ -21,10 +21,10 @@ require ( k8s.io/client-go v0.26.5 k8s.io/code-generator v0.26.5 k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 - knative.dev/eventing v0.37.1-0.20230725143141-855fbedc00d0 + knative.dev/eventing v0.38.0 knative.dev/hack v0.0.0-20230712131415-ddae80293c43 knative.dev/pkg v0.0.0-20230718152110-aef227e72ead - knative.dev/reconciler-test v0.0.0-20230720092812-7286e0a369dc + knative.dev/reconciler-test v0.0.0-20230726074640-26cee79ad63d sigs.k8s.io/controller-runtime v0.14.6 ) diff --git a/go.sum b/go.sum index 693c0c1be4..32504a101d 100644 --- a/go.sum +++ b/go.sum @@ -883,14 +883,14 @@ k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+O k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4= k8s.io/utils v0.0.0-20230505201702-9f6742963106 h1:EObNQ3TW2D+WptiYXlApGNLVy0zm/JIBVY9i+M4wpAU= k8s.io/utils v0.0.0-20230505201702-9f6742963106/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -knative.dev/eventing v0.37.1-0.20230725143141-855fbedc00d0 h1:Et24SoigIUlapBOHOMK3gy8+84ZF+JXfAgAJp/2TaMY= -knative.dev/eventing v0.37.1-0.20230725143141-855fbedc00d0/go.mod h1:nd7MZ/O5nyNKlZ1m9XAxI9eSq2bYWJQfYogSXflRpqc= +knative.dev/eventing v0.38.0 h1:n6/k9IJ1kOvpZx4CMLqa1FG7g2iBiyKXwBu1Fy/81q4= +knative.dev/eventing v0.38.0/go.mod h1:JUqEC0zoyfYqhRHFz8VUxjkxH9G1cQ/Y+UvhXTxUXgI= knative.dev/hack v0.0.0-20230712131415-ddae80293c43 h1:3SE06uNfSFGm/5XS+0trbyCUpgsOaBeyhPQU8FPNFz8= knative.dev/hack v0.0.0-20230712131415-ddae80293c43/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q= knative.dev/pkg v0.0.0-20230718152110-aef227e72ead h1:2dDzorpKuVZW3Qp7TbirMMq16FbId8f6bacQFX8jXLw= knative.dev/pkg v0.0.0-20230718152110-aef227e72ead/go.mod h1:WmrwRV/P+hGHoMraAEfwg6ec+fBTf+Obu41v354Iabc= -knative.dev/reconciler-test v0.0.0-20230720092812-7286e0a369dc h1:uzrOfQ30FKaynWRWUqEnKk6lP7SEl0ikC6jOfxFhf6A= -knative.dev/reconciler-test v0.0.0-20230720092812-7286e0a369dc/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM= +knative.dev/reconciler-test v0.0.0-20230726074640-26cee79ad63d h1:B7s1+wFSkQF3oJFjMK3WGPiKvBYMatjgxqei0CX0BoA= +knative.dev/reconciler-test v0.0.0-20230726074640-26cee79ad63d/go.mod h1:i+/PWK/n3HPgjXMoj5U7CA6WRW/C3c3EfHCQ0FmrhNM= pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U= pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go b/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go index 50ccfd3cd7..d26f793d7c 100644 --- a/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go +++ b/vendor/knative.dev/reconciler-test/pkg/eventshub/sender/sender.go @@ -39,6 +39,7 @@ import ( "github.com/cloudevents/sdk-go/v2/types" "github.com/kelseyhightower/envconfig" "go.opencensus.io/trace" + "go.uber.org/atomic" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" "knative.dev/pkg/logging" @@ -116,6 +117,10 @@ type generator struct { eventQueue []conformanceevent.Event } +var ( + verifyConnectionCounter = atomic.NewUint64(0) +) + func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventshub.ClientOption) error { var env generator if err := envconfig.Process("", &env); err != nil { @@ -143,27 +148,9 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh logging.FromContext(ctx).Info("awake, continuing") } - httpClient := nethttp.DefaultClient - - if env.EnforceTLS { - caCertPool, err := x509.SystemCertPool() - if err != nil { - return fmt.Errorf("failed to create cert pool %s: %w", env.Sink, err) - } - caCertPool.AppendCertsFromPEM([]byte(env.CACerts)) - - transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone() - transport.TLSClientConfig = &tls.Config{ - RootCAs: caCertPool, - MinVersion: tls.VersionTLS12, - VerifyConnection: func(state tls.ConnectionState) error { - if err := logs.Vent(env.peerCertificatesReceived(state)); err != nil { - return err - } - return nil - }, - } - httpClient = &nethttp.Client{Transport: transport} + httpClient, _, err := createClient(ctx, env, logs) + if err != nil { + return err } if env.ProbeSink { @@ -185,12 +172,6 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh } } - for _, opt := range clientOpts { - if err := opt(httpClient); err != nil { - return fmt.Errorf("unable to apply option: %w", err) - } - } - switch env.EventEncoding { case "binary": ctx = cloudevents.WithEncodingBinary(ctx) @@ -203,6 +184,19 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh ticker := time.NewTicker(period) for { + // when enforcing TLS we want to create multiple transports to force multiple TLS handshakes + // on each request sent so that VerifyConnection is called multiple times. + httpClient, _, err = createClient(ctx, env, logs) + if err != nil { + return err + } + + for _, opt := range clientOpts { + if err := opt(httpClient); err != nil { + return fmt.Errorf("unable to apply option: %w", err) + } + } + ctx, span := trace.StartSpan(ctx, "eventshub-sender") req, event, err := env.next(ctx) @@ -251,13 +245,46 @@ func Start(ctx context.Context, logs *eventshub.EventLogs, clientOpts ...eventsh } } -func (g *generator) peerCertificatesReceived(state tls.ConnectionState) eventshub.EventInfo { +func createClient(ctx context.Context, env generator, logs *eventshub.EventLogs) (*nethttp.Client, *nethttp.Transport, error) { + if env.EnforceTLS { + caCertPool, err := x509.SystemCertPool() + if err != nil { + return nil, nil, fmt.Errorf("failed to create cert pool %s: %w", env.Sink, err) + } + caCertPool.AppendCertsFromPEM([]byte(env.CACerts)) + + transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone() + + // Force multiple TLS handshakes + transport.DisableKeepAlives = true + transport.IdleConnTimeout = 500 * time.Millisecond + + transport.TLSClientConfig = &tls.Config{ + RootCAs: caCertPool, + MinVersion: tls.VersionTLS12, + VerifyConnection: func(state tls.ConnectionState) error { + logging.FromContext(ctx).Infow("VerifyConnection") + + if err := logs.Vent(env.peerCertificatesReceived(verifyConnectionCounter.Inc(), state)); err != nil { + return err + } + return nil + }, + } + return &nethttp.Client{Transport: transport}, transport, nil + } + + return nethttp.DefaultClient, nethttp.DefaultTransport.(*nethttp.Transport), nil +} + +func (g *generator) peerCertificatesReceived(counter uint64, state tls.ConnectionState) eventshub.EventInfo { return eventshub.EventInfo{ Kind: eventshub.PeerCertificatesReceived, Connection: eventshub.TLSConnectionStateToConnection(&state), Origin: g.SenderName, Observer: g.SenderName, Time: time.Now(), + Sequence: counter, } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 54431ce2b9..dec7fda1e0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1053,7 +1053,7 @@ k8s.io/utils/net k8s.io/utils/pointer k8s.io/utils/strings/slices k8s.io/utils/trace -# knative.dev/eventing v0.37.1-0.20230725143141-855fbedc00d0 +# knative.dev/eventing v0.38.0 ## explicit; go 1.19 knative.dev/eventing/cmd/heartbeats knative.dev/eventing/pkg/adapter/v2 @@ -1269,7 +1269,7 @@ knative.dev/pkg/webhook/json knative.dev/pkg/webhook/resourcesemantics knative.dev/pkg/webhook/resourcesemantics/defaulting knative.dev/pkg/webhook/resourcesemantics/validation -# knative.dev/reconciler-test v0.0.0-20230720092812-7286e0a369dc +# knative.dev/reconciler-test v0.0.0-20230726074640-26cee79ad63d ## explicit; go 1.18 knative.dev/reconciler-test/cmd/eventshub knative.dev/reconciler-test/pkg/environment