Skip to content

Commit

Permalink
Replace the rest deprecated func with wait.PollUntilContextTimeout (#…
Browse files Browse the repository at this point in the history
…14565)

* Replace the rest deprecated func with wait.PollUntilContextTimeout

Signed-off-by: pingjiang <[email protected]>

* Replace wait.PollInfinite() and wait.Poll()

Signed-off-by: pingjiang <[email protected]>

---------

Signed-off-by: pingjiang <[email protected]>
  • Loading branch information
xiangpingjiang authored Oct 27, 2023
1 parent 40ebfb6 commit 7c92928
Show file tree
Hide file tree
Showing 37 changed files with 83 additions and 79 deletions.
2 changes: 1 addition & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func main() {

// We sometimes startup faster than we can reach kube-api. Poll on failure to prevent us terminating
var err error
if perr := wait.PollImmediate(time.Second, 60*time.Second, func() (bool, error) {
if perr := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(context.Context) (bool, error) {
if err = version.CheckMinimumVersion(kubeClient.Discovery()); err != nil {
log.Print("Failed to get k8s version ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/default-domain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func findGatewayAddress(ctx context.Context, kubeclient kubernetes.Interface, cl
defer client.NetworkingV1alpha1().Ingresses(system.Namespace()).Delete(ctx, ing.Name, metav1.DeleteOptions{})

// Wait for the Ingress to be Ready.
if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, pollInterval, waitTimeout, true, func(context.Context) (done bool, err error) {
ing, err = client.NetworkingV1alpha1().Ingresses(system.Namespace()).Get(
ctx, ing.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -149,7 +149,7 @@ func findGatewayAddress(ctx context.Context, kubeclient kubernetes.Interface, cl

// Wait for the Ingress Service to have an external IP.
var svc *corev1.Service
if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, pollInterval, waitTimeout, true, func(context.Context) (done bool, err error) {
svc, err = kubeclient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return true, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/activator/certificate/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestReconcile(t *testing.T) {
fakesecretinformer.Get(ctx).Informer().GetIndexer().Add(secret)

// Wait for the resources to be created and the handler is called.
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
// To access cert.Certificate, take a lock.
cr.certificatesMux.RLock()
defer cr.certificatesMux.RUnlock()
Expand All @@ -108,7 +108,7 @@ func TestReconcile(t *testing.T) {
newCert, _ := tls.X509KeyPair(newTLSCrt, newTLSKey)

fakekubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Update(ctx, secret, metav1.UpdateOptions{})
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
// To access cert.Certificate, take a lock.
cr.certificatesMux.RLock()
defer cr.certificatesMux.RUnlock()
Expand All @@ -132,7 +132,7 @@ func TestReconcile(t *testing.T) {
pool.AddCert(ca)

fakekubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Update(ctx, secret, metav1.UpdateOptions{})
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
// To access cr.TLSConf.RootCAs, take a lock.
cr.certificatesMux.RLock()
defer cr.certificatesMux.RUnlock()
Expand Down
10 changes: 5 additions & 5 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestThrottlerErrorNoRevision(t *testing.T) {

// Eventually it should now fail.
var lastError error
wait.PollInfinite(10*time.Millisecond, func() (bool, error) {
wait.PollUntilContextCancel(ctx, 10*time.Millisecond, false, func(context.Context) (bool, error) {
lastError = throttler.Try(ctx, revID, func(string) error { return nil })
return lastError != nil, nil
})
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestThrottlerSuccesses(t *testing.T) {
if *cc != 0 {
wantCapacity = dests * int(*cc)
}
if err := wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) {
rt.mux.RLock()
defer rt.mux.RUnlock()
if *cc != 0 {
Expand Down Expand Up @@ -770,7 +770,7 @@ func TestActivatorsIndexUpdate(t *testing.T) {

// Verify capacity gets updated. This is the very last thing we update
// so we now know that the rest is set statically.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) {
// Capacity doesn't exceed 1 in this test.
return rt.breaker.Capacity() == 1, nil
}); err != nil {
Expand All @@ -795,7 +795,7 @@ func TestActivatorsIndexUpdate(t *testing.T) {
endpoints.Informer().GetIndexer().Update(publicEp)

// Verify the index was computed.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) {
return rt.numActivators.Load() == 1 &&
rt.activatorIndex.Load() == 0, nil
}); err != nil {
Expand Down Expand Up @@ -867,7 +867,7 @@ func TestMultipleActivators(t *testing.T) {
// Verify capacity gets updated. This is the very last thing we update
// so we now know that we got and processed both the activator endpoints
// and the application endpoints.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) {
return rt.breaker.Capacity() == 1, nil
}); err != nil {
t.Fatal("Timed out waiting for the capacity to be updated")
Expand Down
7 changes: 4 additions & 3 deletions pkg/autoscaler/metrics/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package metrics

import (
"context"
"errors"
"math"
"testing"
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestMetricCollectorScraperMovingTime(t *testing.T) {
}
var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64
// Poll to see that the async loop completed.
wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now)
gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now)
return gotConcurrency == wantConcurrency &&
Expand Down Expand Up @@ -256,7 +257,7 @@ func TestMetricCollectorScraper(t *testing.T) {
}
var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64
// Poll to see that the async loop completed.
wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now)
gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now)
return gotConcurrency == wantConcurrency &&
Expand Down Expand Up @@ -289,7 +290,7 @@ func TestMetricCollectorScraper(t *testing.T) {
mtp.Channel <- now

// Wait for async loop to finish.
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
gotConcurrency, _, _ = coll.StableAndPanicConcurrency(metricKey, now.Add(defaultMetric.Spec.StableWindow).Add(-5*time.Second))
gotRPS, _, _ = coll.StableAndPanicRPS(metricKey, now.Add(defaultMetric.Spec.StableWindow).Add(-5*time.Second))
return gotConcurrency == reportConcurrency*5 && gotRPS == reportRPS*5, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/autoscaler/statforwarder/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package statforwarder

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestForwarderReconcile(t *testing.T) {

var lastErr error
// Wait for the resources to be created.
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
_, lastErr = service.Lister().Services(testNs).Get(bucket1)
return lastErr == nil, nil
}); err != nil {
Expand All @@ -137,7 +138,7 @@ func TestForwarderReconcile(t *testing.T) {

// Check the endpoints got updated.
el := endpoints.Lister().Endpoints(testNs)
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
got, err := el.Get(bucket1)
if err != nil {
lastErr = err
Expand All @@ -161,7 +162,7 @@ func TestForwarderReconcile(t *testing.T) {

// Check that the endpoints got updated.
wantSubsets[0].Addresses[0].IP = testIP2
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
// Check the endpoints get updated.
got, err := el.Get(bucket1)
if err != nil {
Expand Down Expand Up @@ -445,7 +446,7 @@ func TestProcess(t *testing.T) {
kubeClient.CoordinationV1().Leases(testNs).Create(ctx, anotherLease, metav1.CreateOptions{})
lease.Informer().GetIndexer().Add(anotherLease)

if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
_, p1owned := f.getProcessor(bucket1).(*localProcessor)
_, p2notowned := f.getProcessor(bucket2).(*remoteProcessor)
return p1owned && p2notowned, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/autoscaler/statforwarder/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (f *leaseTracker) leaseUpdated(obj interface{}) {

func (f *leaseTracker) createService(ctx context.Context, ns, n string) error {
var lastErr error
if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
_, lastErr = f.kc.CoreV1().Services(ns).Create(ctx, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Expand Down Expand Up @@ -247,7 +247,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string

exists := true
var lastErr error
if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
e, err := f.endpointsLister.Endpoints(ns).Get(n)
if apierrs.IsNotFound(err) {
exists = false
Expand Down Expand Up @@ -281,7 +281,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string
return nil
}

if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
_, lastErr = f.kc.CoreV1().Endpoints(ns).Create(ctx, &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Expand Down
3 changes: 2 additions & 1 deletion pkg/queue/certificate/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package certificate

import (
"context"
"crypto/tls"
"crypto/x509"
"os"
Expand Down Expand Up @@ -70,7 +71,7 @@ func TestCertificateRotation(t *testing.T) {

// CertWatcher should return the new certificate
// Give CertWatcher some time to update the certificate
if err := wait.Poll(1*time.Second, 60*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(context.Background(), 1*time.Second, 60*time.Second, true, func(context.Context) (bool, error) {
c, err = cw.GetCertificate(nil)
if err != nil {
return false, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/queue/readiness/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package readiness

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error {

var failCount int
var lastProbeErr error
pollErr := wait.PollImmediate(retryInterval, p.pollTimeout, func() (bool, error) {
pollErr := wait.PollUntilContextTimeout(context.Background(), retryInterval, p.pollTimeout, true, func(context.Context) (bool, error) {
if err := probe(aggressiveProbeTimeout); err != nil {
// Reset count of consecutive successes to zero.
p.count = 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/accessor/networking/certificate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestReconcileCertificateCreate(t *testing.T) {
ReconcileCertificate(ctx, ownerObj, desired, accessor)

lister := fakecertinformer.Get(ctx).Lister()
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
cert, err := lister.Certificates(desired.Namespace).Get(desired.Name)
if errors.IsNotFound(err) {
return false, nil
Expand All @@ -121,7 +121,7 @@ func TestReconcileCertificateUpdate(t *testing.T) {
ReconcileCertificate(ctx, ownerObj, desired, accessor)

lister := fakecertinformer.Get(ctx).Lister()
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
cert, err := lister.Certificates(desired.Namespace).Get(desired.Name)
if errors.IsNotFound(err) {
return false, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/autoscaling/kpa/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) {
fakenetworkingclient.Get(ctx).NetworkingV1alpha1().ServerlessServices(testNamespace).Create(ctx, sks, metav1.CreateOptions{})
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})

wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
_, err := fakepainformer.Get(ctx).Lister().PodAutoscalers(testNamespace).Get(kpa.Name)
if err != nil && apierrors.IsNotFound(err) {
return false, nil
Expand All @@ -1399,7 +1399,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) {

// The ReconcileKind call hasn't finished yet at the point where the Decider is created,
// so give it more time to finish before checking the PA for IsReady().
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
newKPA, err := fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(kpa.Namespace).Get(
ctx, kpa.Name, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -1665,7 +1665,7 @@ func TestScaleFailure(t *testing.T) {
}

func pollDeciders(deciders *testDeciders, namespace, name string, cond func(*scaling.Decider) bool) (decider *scaling.Decider, err error) {
wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) {
decider, err = deciders.Get(context.Background(), namespace, name)
if err != nil {
return false, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func TestReconcileWithCollector(t *testing.T) {

scs.AutoscalingV1alpha1().Metrics(m.Namespace).Create(ctx, m, metav1.CreateOptions{})

if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
return collector.createOrUpdateCalls.Load() > 0, nil
}); err != nil {
t.Fatal("CreateOrUpdate() called 0 times, want non-zero times")
}

scs.AutoscalingV1alpha1().Metrics(m.Namespace).Delete(ctx, m.Name, metav1.DeleteOptions{})
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
return collector.deleteCalls.Load() > 0, nil
}); err != nil {
t.Fatal("Delete() called 0 times, want non-zero times")
Expand Down
Loading

0 comments on commit 7c92928

Please sign in to comment.