Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Test context #72

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func main() {

// We sometimes startup faster than we can reach kube-api. Poll on failure to prevent us terminating
var err error
if perr := wait.PollImmediate(time.Second, 60*time.Second, func() (bool, error) {
if perr := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(context.Context) (bool, error) {
if err = version.CheckMinimumVersion(kubeClient.Discovery()); err != nil {
log.Print("Failed to get k8s version ", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func main() {

// We sometimes startup faster than we can reach kube-api. Poll on failure to prevent us terminating
var err error
if perr := wait.PollImmediate(time.Second, 60*time.Second, func() (bool, error) {
if perr := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(context.Context) (bool, error) {
if err = version.CheckMinimumVersion(kubeClient.Discovery()); err != nil {
log.Print("Failed to get k8s version ", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/default-domain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func findGatewayAddress(ctx context.Context, kubeclient kubernetes.Interface, cl
defer client.NetworkingV1alpha1().Ingresses(system.Namespace()).Delete(ctx, ing.Name, metav1.DeleteOptions{})

// Wait for the Ingress to be Ready.
if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, pollInterval, waitTimeout, true, func(context.Context) (done bool, err error) {
ing, err = client.NetworkingV1alpha1().Ingresses(system.Namespace()).Get(
ctx, ing.Name, metav1.GetOptions{})
if err != nil {
Expand All @@ -149,7 +149,7 @@ func findGatewayAddress(ctx context.Context, kubeclient kubernetes.Interface, cl

// Wait for the Ingress Service to have an external IP.
var svc *corev1.Service
if err := wait.PollImmediate(pollInterval, waitTimeout, func() (done bool, err error) {
if err := wait.PollUntilContextTimeout(ctx, pollInterval, waitTimeout, true, func(context.Context) (done bool, err error) {
svc, err = kubeclient.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return true, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/activator/certificate/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestReconcile(t *testing.T) {
fakesecretinformer.Get(ctx).Informer().GetIndexer().Add(secret)

// Wait for the resources to be created and the handler is called.
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
// To access cert.Certificate, take a lock.
cr.certificatesMux.RLock()
defer cr.certificatesMux.RUnlock()
Expand All @@ -108,7 +108,7 @@ func TestReconcile(t *testing.T) {
newCert, _ := tls.X509KeyPair(newTLSCrt, newTLSKey)

fakekubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Update(ctx, secret, metav1.UpdateOptions{})
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
// To access cert.Certificate, take a lock.
cr.certificatesMux.RLock()
defer cr.certificatesMux.RUnlock()
Expand All @@ -132,7 +132,7 @@ func TestReconcile(t *testing.T) {
pool.AddCert(ca)

fakekubeclient.Get(ctx).CoreV1().Secrets(system.Namespace()).Update(ctx, secret, metav1.UpdateOptions{})
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
// To access cr.TLSConf.RootCAs, take a lock.
cr.certificatesMux.RLock()
defer cr.certificatesMux.RUnlock()
Expand Down
8 changes: 4 additions & 4 deletions pkg/activator/net/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func TestThrottlerSuccesses(t *testing.T) {
if *cc != 0 {
wantCapacity = dests * int(*cc)
}
if err := wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) {
rt.mux.RLock()
defer rt.mux.RUnlock()
if *cc != 0 {
Expand Down Expand Up @@ -770,7 +770,7 @@ func TestActivatorsIndexUpdate(t *testing.T) {

// Verify capacity gets updated. This is the very last thing we update
// so we now know that the rest is set statically.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) {
// Capacity doesn't exceed 1 in this test.
return rt.breaker.Capacity() == 1, nil
}); err != nil {
Expand All @@ -795,7 +795,7 @@ func TestActivatorsIndexUpdate(t *testing.T) {
endpoints.Informer().GetIndexer().Update(publicEp)

// Verify the index was computed.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) {
return rt.numActivators.Load() == 1 &&
rt.activatorIndex.Load() == 0, nil
}); err != nil {
Expand Down Expand Up @@ -867,7 +867,7 @@ func TestMultipleActivators(t *testing.T) {
// Verify capacity gets updated. This is the very last thing we update
// so we now know that we got and processed both the activator endpoints
// and the application endpoints.
if err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Second, true, func(context.Context) (bool, error) {
return rt.breaker.Capacity() == 1, nil
}); err != nil {
t.Fatal("Timed out waiting for the capacity to be updated")
Expand Down
7 changes: 4 additions & 3 deletions pkg/autoscaler/metrics/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package metrics

import (
"context"
"errors"
"math"
"testing"
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestMetricCollectorScraperMovingTime(t *testing.T) {
}
var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64
// Poll to see that the async loop completed.
wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now)
gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now)
return gotConcurrency == wantConcurrency &&
Expand Down Expand Up @@ -256,7 +257,7 @@ func TestMetricCollectorScraper(t *testing.T) {
}
var gotRPS, gotConcurrency, panicRPS, panicConcurrency float64
// Poll to see that the async loop completed.
wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
gotConcurrency, panicConcurrency, _ = coll.StableAndPanicConcurrency(metricKey, now)
gotRPS, panicRPS, _ = coll.StableAndPanicRPS(metricKey, now)
return gotConcurrency == wantConcurrency &&
Expand Down Expand Up @@ -289,7 +290,7 @@ func TestMetricCollectorScraper(t *testing.T) {
mtp.Channel <- now

// Wait for async loop to finish.
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
gotConcurrency, _, _ = coll.StableAndPanicConcurrency(metricKey, now.Add(defaultMetric.Spec.StableWindow).Add(-5*time.Second))
gotRPS, _, _ = coll.StableAndPanicRPS(metricKey, now.Add(defaultMetric.Spec.StableWindow).Add(-5*time.Second))
return gotConcurrency == reportConcurrency*5 && gotRPS == reportRPS*5, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/autoscaler/statforwarder/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package statforwarder

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestForwarderReconcile(t *testing.T) {

var lastErr error
// Wait for the resources to be created.
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
_, lastErr = service.Lister().Services(testNs).Get(bucket1)
return lastErr == nil, nil
}); err != nil {
Expand All @@ -137,7 +138,7 @@ func TestForwarderReconcile(t *testing.T) {

// Check the endpoints got updated.
el := endpoints.Lister().Endpoints(testNs)
if err := wait.PollImmediate(10*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
got, err := el.Get(bucket1)
if err != nil {
lastErr = err
Expand All @@ -161,7 +162,7 @@ func TestForwarderReconcile(t *testing.T) {

// Check that the endpoints got updated.
wantSubsets[0].Addresses[0].IP = testIP2
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
// Check the endpoints get updated.
got, err := el.Get(bucket1)
if err != nil {
Expand Down Expand Up @@ -445,7 +446,7 @@ func TestProcess(t *testing.T) {
kubeClient.CoordinationV1().Leases(testNs).Create(ctx, anotherLease, metav1.CreateOptions{})
lease.Informer().GetIndexer().Add(anotherLease)

if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(context.Context) (bool, error) {
_, p1owned := f.getProcessor(bucket1).(*localProcessor)
_, p2notowned := f.getProcessor(bucket2).(*remoteProcessor)
return p1owned && p2notowned, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/autoscaler/statforwarder/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (f *leaseTracker) leaseUpdated(obj interface{}) {

func (f *leaseTracker) createService(ctx context.Context, ns, n string) error {
var lastErr error
if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
_, lastErr = f.kc.CoreV1().Services(ns).Create(ctx, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Expand Down Expand Up @@ -247,7 +247,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string

exists := true
var lastErr error
if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
e, err := f.endpointsLister.Endpoints(ns).Get(n)
if apierrs.IsNotFound(err) {
exists = false
Expand Down Expand Up @@ -281,7 +281,7 @@ func (f *leaseTracker) createOrUpdateEndpoints(ctx context.Context, ns, n string
return nil
}

if err := wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, retryInterval, retryTimeout, true, func(context.Context) (bool, error) {
_, lastErr = f.kc.CoreV1().Endpoints(ns).Create(ctx, &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: n,
Expand Down
3 changes: 2 additions & 1 deletion pkg/queue/readiness/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package readiness

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (p *Probe) doProbe(probe func(time.Duration) error) error {

var failCount int
var lastProbeErr error
pollErr := wait.PollImmediate(retryInterval, p.pollTimeout, func() (bool, error) {
pollErr := wait.PollUntilContextTimeout(context.Background(), retryInterval, p.pollTimeout, true, func(context.Context) (bool, error) {
if err := probe(aggressiveProbeTimeout); err != nil {
// Reset count of consecutive successes to zero.
p.count = 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/accessor/networking/certificate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestReconcileCertificateCreate(t *testing.T) {
ReconcileCertificate(ctx, ownerObj, desired, accessor)

lister := fakecertinformer.Get(ctx).Lister()
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
cert, err := lister.Certificates(desired.Namespace).Get(desired.Name)
if errors.IsNotFound(err) {
return false, nil
Expand All @@ -121,7 +121,7 @@ func TestReconcileCertificateUpdate(t *testing.T) {
ReconcileCertificate(ctx, ownerObj, desired, accessor)

lister := fakecertinformer.Get(ctx).Lister()
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
cert, err := lister.Certificates(desired.Namespace).Get(desired.Name)
if errors.IsNotFound(err) {
return false, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/reconciler/autoscaling/kpa/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) {
fakenetworkingclient.Get(ctx).NetworkingV1alpha1().ServerlessServices(testNamespace).Create(ctx, sks, metav1.CreateOptions{})
fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(testNamespace).Create(ctx, kpa, metav1.CreateOptions{})

wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
_, err := fakepainformer.Get(ctx).Lister().PodAutoscalers(testNamespace).Get(kpa.Name)
if err != nil && apierrors.IsNotFound(err) {
return false, nil
Expand All @@ -1399,7 +1399,7 @@ func TestReconcileDeciderCreatesAndDeletes(t *testing.T) {

// The ReconcileKind call hasn't finished yet at the point where the Decider is created,
// so give it more time to finish before checking the PA for IsReady().
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
newKPA, err := fakeservingclient.Get(ctx).AutoscalingV1alpha1().PodAutoscalers(kpa.Namespace).Get(
ctx, kpa.Name, metav1.GetOptions{})
if err != nil && apierrors.IsNotFound(err) {
Expand Down Expand Up @@ -1665,7 +1665,7 @@ func TestScaleFailure(t *testing.T) {
}

func pollDeciders(deciders *testDeciders, namespace, name string, cond func(*scaling.Decider) bool) (decider *scaling.Decider, err error) {
wait.PollImmediate(10*time.Millisecond, 3*time.Second, func() (bool, error) {
wait.PollUntilContextTimeout(context.Background(), 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) {
decider, err = deciders.Get(context.Background(), namespace, name)
if err != nil {
return false, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func TestReconcileWithCollector(t *testing.T) {

scs.AutoscalingV1alpha1().Metrics(m.Namespace).Create(ctx, m, metav1.CreateOptions{})

if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
return collector.createOrUpdateCalls.Load() > 0, nil
}); err != nil {
t.Fatal("CreateOrUpdate() called 0 times, want non-zero times")
}

scs.AutoscalingV1alpha1().Metrics(m.Namespace).Delete(ctx, m.Name, metav1.DeleteOptions{})
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
return collector.deleteCalls.Load() > 0, nil
}); err != nil {
t.Fatal("Delete() called 0 times, want non-zero times")
Expand Down
18 changes: 9 additions & 9 deletions pkg/reconciler/revision/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func TestGlobalResyncOnDefaultCMChange(t *testing.T) {

revClient.Create(ctx, rev, metav1.CreateOptions{})
revL := fakerevisioninformer.Get(ctx).Lister()
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
// The only error we're getting in the test reasonably is NotFound.
r, _ := revL.Revisions(rev.Namespace).Get(rev.Name)
return r != nil && r.Status.ObservedGeneration == r.Generation, nil
Expand All @@ -513,7 +513,7 @@ func TestGlobalResyncOnDefaultCMChange(t *testing.T) {

// Ensure initial PA is in the informers.
paL := fakepainformer.Get(ctx).Lister().PodAutoscalers(rev.Namespace)
if ierr := wait.PollImmediate(50*time.Millisecond, 6*time.Second, func() (bool, error) {
if ierr := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 6*time.Second, true, func(context.Context) (bool, error) {
_, err = paL.Get(rev.Name)
return err == nil, nil
}); ierr != nil {
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestGlobalResyncOnDefaultCMChange(t *testing.T) {

pa, err := paL.Get(rev.Name)
t.Logf("Initial PA: %#v GetErr: %v", pa, err)
if ierr := wait.PollImmediate(50*time.Millisecond, 2*time.Second, func() (bool, error) {
if ierr := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(context.Context) (bool, error) {
pa, err = paL.Get(rev.Name)
return pa != nil && pa.Spec.ContainerConcurrency == pos, nil
}); ierr == nil { // err==nil!
Expand Down Expand Up @@ -586,7 +586,7 @@ func TestGlobalResyncOnConfigMapUpdateRevision(t *testing.T) {

revClient.Create(ctx, rev, metav1.CreateOptions{})
revL := fakerevisioninformer.Get(ctx).Lister()
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
// The only error we're getting in the test reasonably is NotFound.
r, _ := revL.Revisions(rev.Namespace).Get(rev.Name)
// We only create a single revision, but make sure it is reconciled.
Expand All @@ -608,7 +608,7 @@ func TestGlobalResyncOnConfigMapUpdateRevision(t *testing.T) {
})

want := "http://new-logging.test.com?filter=" + string(rev.UID)
if ierr := wait.PollImmediate(50*time.Millisecond, 5*time.Second, func() (bool, error) {
if ierr := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
r, _ := revL.Revisions(rev.Namespace).Get(rev.Name)
return r != nil && r.Status.LogURL == want, nil
}); ierr != nil {
Expand Down Expand Up @@ -664,7 +664,7 @@ func TestGlobalResyncOnConfigMapUpdateDeployment(t *testing.T) {

revClient.Create(ctx, rev, metav1.CreateOptions{})
revL := fakerevisioninformer.Get(ctx).Lister().Revisions(rev.Namespace)
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
// The only error we're getting in the test reasonably is NotFound.
r, _ := revL.Get(rev.Name)
// We only create a single revision, but make sure it is reconciled.
Expand All @@ -677,7 +677,7 @@ func TestGlobalResyncOnConfigMapUpdateDeployment(t *testing.T) {
watcher.OnChange(configMapToUpdate)

depL := fakedeploymentinformer.Get(ctx).Lister().Deployments(rev.Namespace)
if err := wait.PollImmediate(10*time.Millisecond, 5*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(context.Context) (bool, error) {
dep, _ := depL.Get(names.Deployment(rev))
return dep != nil && checkF(dep), nil
}); err != nil {
Expand Down Expand Up @@ -713,7 +713,7 @@ func TestNewRevisionCallsSyncHandler(t *testing.T) {
}

// Poll to see PA object to be created.
if err := wait.PollImmediate(25*time.Millisecond, 3*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 3*time.Second, true, func(context.Context) (bool, error) {
pa, _ := servingClient.AutoscalingV1alpha1().PodAutoscalers(rev.Namespace).Get(
ctx, rev.Name, metav1.GetOptions{})
return pa != nil, nil
Expand All @@ -723,7 +723,7 @@ func TestNewRevisionCallsSyncHandler(t *testing.T) {

// Poll to see if the deployment is created. This should _already_ be there.
depL := fakedeploymentinformer.Get(ctx).Lister().Deployments(rev.Namespace)
if err := wait.PollImmediate(10*time.Millisecond, 1*time.Second, func() (bool, error) {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 1*time.Second, true, func(context.Context) (bool, error) {
dep, _ := depL.Get(names.Deployment(rev))
return dep != nil, nil
}); err != nil {
Expand Down
Loading
Loading