Skip to content

Commit

Permalink
fix lint errors
Browse files Browse the repository at this point in the history
Signed-off-by: rahulii <[email protected]>
  • Loading branch information
rahulii committed Jul 31, 2024
1 parent e9ac303 commit 0673abf
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 22 deletions.
5 changes: 3 additions & 2 deletions control-plane/pkg/contract/contract.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3071,7 +3071,7 @@ func makeTLSSecret() *corev1.Secret {
Name: brokerIngressTLSSecretName,
},
Data: map[string][]byte{
"ca.crt": []byte(eventingtlstesting.CA),
"ca.crt": eventingtlstesting.CA,
},
Type: corev1.SecretTypeTLS,
}
Expand Down
9 changes: 5 additions & 4 deletions test/e2e_sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
testlib "knative.dev/eventing/test/lib"

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
Expand All @@ -37,7 +37,8 @@ import (
)

const (
sinkSecretName = "secret-test"
sinkSecretName = "secret-test"
numPartitions int32 = 10
)

func RunTestKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func(kss *eventingv1alpha1.KafkaSinkSpec) error) {
Expand All @@ -59,10 +60,10 @@ func RunTestKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func

kss := eventingv1alpha1.KafkaSinkSpec{
Topic: "kafka-sink-" + client.Namespace,
NumPartitions: pointer.Int32(10),
NumPartitions: ptr.To(numPartitions),
ReplicationFactor: func(rf int16) *int16 { return &rf }(1),
BootstrapServers: BootstrapServersPlaintextArr,
ContentMode: pointer.String(mode),
ContentMode: ptr.To(mode),
}
for _, opt := range opts {
require.Nil(t, opt(&kss))
Expand Down
4 changes: 2 additions & 2 deletions test/e2e_sink/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package e2e_sink
import (
"testing"

"k8s.io/utils/pointer"
"k8s.io/utils/ptr"

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
. "knative.dev/eventing-kafka-broker/test/pkg"
Expand All @@ -31,7 +31,7 @@ import (

func TestKafkaSinkV1Alpha1DefaultContentMode(t *testing.T) {
RunTestKafkaSink(t, eventingv1alpha1.ModeStructured, nil, func(kss *eventingv1alpha1.KafkaSinkSpec) error {
kss.ContentMode = pointer.String("")
kss.ContentMode = ptr.To("")
return nil
})
}
Expand Down
1 change: 0 additions & 1 deletion test/e2e_source/helpers/kafka_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const (

var (
topicGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziTopicResource}
userGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziUserResource}
ImcGVR = schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1", Resource: "inmemorychannels"}
)

Expand Down
4 changes: 2 additions & 2 deletions test/lib/resources/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"

v1 "knative.dev/eventing/pkg/apis/duck/v1"

Expand Down Expand Up @@ -130,7 +130,7 @@ func WithKafkaChannelEndpointsReady() KafkaChannelOption {
func WithKafkaChannelAddress(a string) KafkaChannelOption {
return func(nc *v1beta1.KafkaChannel) {
nc.Status.SetAddress(&duckv1.Addressable{
Name: pointer.String("http"),
Name: ptr.To("http"),
URL: apis.HTTP(a),
})
}
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/features/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func KafkaSourceLease() *feature.Feature {

func verifyLeaseAcquired(name string) feature.StepFn {
return func(ctx context.Context, t feature.T) {
err := wait.Poll(time.Second, time.Minute, func() (done bool, err error) {
err := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) {
lease, err := kubeclient.Get(ctx).
CoordinationV1().
Leases(system.Namespace()).
Expand Down
2 changes: 1 addition & 1 deletion test/rekt/resources/kafkasource/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func VerifyScale(name string, replicas int32) feature.StepFn {
return func(ctx context.Context, t feature.T) {
interval, timeout := environment.PollTimingsFromContext(ctx)
last := &sources.KafkaSource{}
err := wait.PollImmediate(interval, timeout, func() (done bool, err error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ks, err := kafkaclientset.Get(ctx).
SourcesV1beta1().
KafkaSources(environment.FromContext(ctx).Namespace()).
Expand Down
4 changes: 2 additions & 2 deletions test/rekt/resources/kafkatopic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func HasReplicationFactor(name string, replicationFactor int, timings ...time.Du
return func(ctx context.Context, t feature.T) {
interval, timeout := k8s.PollTimings(ctx, timings)

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ut, err := dynamicclient.Get(ctx).
Resource(GVR()).
Namespace(kafkaNamespace).
Expand Down Expand Up @@ -119,7 +119,7 @@ func HasNumPartitions(name string, numPartitions int, timings ...time.Duration)
return func(ctx context.Context, t feature.T) {
interval, timeout := k8s.PollTimings(ctx, timings)

err := wait.PollImmediate(interval, timeout, func() (bool, error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
ut, err := dynamicclient.Get(ctx).
Resource(GVR()).
Namespace(kafkaNamespace).
Expand Down
10 changes: 6 additions & 4 deletions test/test_images/consumer-group-lag-provider-test/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/IBM/sarama"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/reconciler-test/pkg/k8s"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
Expand Down Expand Up @@ -76,6 +77,7 @@ func main() {

log.Println("Sending events to topic", topic)

interval, timeout := k8s.PollTimings(ctx, []time.Duration{})
for i := 0; i < n; i++ {
msg := &sarama.ProducerMessage{
Topic: topic,
Expand All @@ -84,7 +86,7 @@ func main() {
}
// Send message might fail with:
// "kafka server: Request was for a topic or partition that does not exist on this broker."
err := wait.PollImmediateUntil(time.Minute, func() (done bool, err error) {
err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
partition, offset, err := producer.SendMessage(msg)
if err != nil {
return false, nil
Expand All @@ -95,7 +97,7 @@ func main() {
}
lastOffset = offset
return true, nil
}, ctx.Done())
})
mustBeNil(err)
}
if int64(n) != lastOffset+1 { // Consistency check
Expand Down Expand Up @@ -137,7 +139,7 @@ func main() {
mustBeNil(err)

// Wait for propagation of the committed offset
err = wait.PollImmediateUntil(time.Minute, func() (done bool, err error) {
err = wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
log.Println("Starting consumer group lag provider")

consumerGroupLagProvider := kafka.NewConsumerGroupLagProvider(client, sarama.NewClusterAdminFromClient, sarama.OffsetOldest)
Expand Down Expand Up @@ -176,7 +178,7 @@ func main() {
return false, nil
}
return true, nil
}, ctx.Done())
})
mustBeNil(err)
}

Expand Down
4 changes: 2 additions & 2 deletions test/upgrade/postupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func verifyPostInstall(t *testing.T) {
defer testlib.TearDown(client)

var lastJob *batchv1.Job
err := wait.Poll(5*time.Second, 10*time.Minute, func() (done bool, err error) {
lastJob, err = client.Kube.
err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
lastJob, err := client.Kube.
BatchV1().
Jobs(system.Namespace()).
Get(context.Background(), name, metav1.GetOptions{})
Expand Down

0 comments on commit 0673abf

Please sign in to comment.