Skip to content

Commit

Permalink
Add KafkaChannel v1 API (#4222)
Browse files Browse the repository at this point in the history
* KafkaChannel v1 API

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Update codegen

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* v1beta1 remains the stored version

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix service name in CRD

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Update codegen

Signed-off-by: Pierangelo Di Pilato <[email protected]>

---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Jan 24, 2025
1 parent 3f9f40c commit ae8a755
Show file tree
Hide file tree
Showing 36 changed files with 2,680 additions and 6 deletions.
10 changes: 10 additions & 0 deletions control-plane/cmd/webhook-kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
eventingcorev1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"

messagingv1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1"
sourcesv1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1"
sourcesv1beta1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/sources/v1beta1"

Expand All @@ -57,6 +58,7 @@ var types = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
sourcesv1beta1.SchemeGroupVersion.WithKind("KafkaSource"): &sourcesv1beta1.KafkaSource{},
sourcesv1.SchemeGroupVersion.WithKind("KafkaSource"): &sourcesv1.KafkaSource{},
messagingv1beta1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1beta1.KafkaChannel{},
messagingv1.SchemeGroupVersion.WithKind("KafkaChannel"): &messagingv1.KafkaChannel{},
eventingcorev1.SchemeGroupVersion.WithKind("Broker"): &eventingv1.BrokerStub{},
kafkainternals.SchemeGroupVersion.WithKind("ConsumerGroup"): &kafkainternals.ConsumerGroup{},
kafkainternals.SchemeGroupVersion.WithKind("Consumer"): &kafkainternals.Consumer{},
Expand Down Expand Up @@ -168,6 +170,14 @@ func NewConversionController(ctx context.Context, _ configmap.Watcher) *controll
sourcesv1.SchemeGroupVersion.Version: &sourcesv1.KafkaSource{},
},
},
messagingv1.Kind("KafkaChannel"): {
DefinitionName: "kafkachannels.messaging.knative.dev",
HubVersion: messagingv1beta1.SchemeGroupVersion.Version,
Zygotes: map[string]conversion.ConvertibleObject{
messagingv1beta1.SchemeGroupVersion.Version: &messagingv1beta1.KafkaChannel{},
messagingv1.SchemeGroupVersion.Version: &messagingv1.KafkaChannel{},
},
},
},
// A function that infuses the context passed to ConvertTo/ConvertFrom/SetDefaults with custom metadata.
ctxFunc,
Expand Down

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions control-plane/pkg/apis/messaging/v1/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package v1 is the v1 version of the API.
// +k8s:deepcopy-gen=package
// +groupName=messaging.knative.dev
package v1
34 changes: 34 additions & 0 deletions control-plane/pkg/apis/messaging/v1/kafka_channel_conversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
"context"
"fmt"

"knative.dev/pkg/apis"
)

// ConvertTo implements apis.Convertible
func (channel *KafkaChannel) ConvertTo(_ context.Context, sink apis.Convertible) error {
return fmt.Errorf("v1 is the highest known version, got: %T", sink)
}

// ConvertFrom implements apis.Convertible
func (sink *KafkaChannel) ConvertFrom(_ context.Context, channel apis.Convertible) error {
return fmt.Errorf("v1 is the highest known version, got: %T", channel)
}
73 changes: 73 additions & 0 deletions control-plane/pkg/apis/messaging/v1/kafka_channel_defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
"context"
"time"

"knative.dev/eventing/pkg/apis/messaging"
"knative.dev/pkg/apis"
)

const (
// DefaultNumPartitions is the KafkaChannel Spec default for the number of partitions
DefaultNumPartitions = 1

// DefaultReplicationFactor is the KafkaChannel Spec default for the replication factor
DefaultReplicationFactor = 1

// DefaultRetentionISO8601Duration is the KafkaChannel Spec default for the retention duration as an ISO-8601 string
DefaultRetentionISO8601Duration = "PT168H" // Precise 7 Days

// DefaultRetentionDuration is the time.Duration equivalent of the DefaultRetentionISO8601Duration
DefaultRetentionDuration = 7 * 24 * time.Hour // Precise 7 Days

// KafkaTopicConfigRetentionMs is the key in the Sarama TopicDetail ConfigEntries map for retention time (in ms)
KafkaTopicConfigRetentionMs = "retention.ms"
)

func (kc *KafkaChannel) SetDefaults(ctx context.Context) {
// Set the duck subscription to the stored version of the duck
// we support. Reason for this is that the stored version will
// not get a chance to get modified, but for newer versions
// conversion webhook will be able to take a crack at it and
// can modify it to match the duck shape.
if kc.Annotations == nil {
kc.Annotations = make(map[string]string)
}

if _, ok := kc.Annotations[messaging.SubscribableDuckVersionAnnotation]; !ok {
kc.Annotations[messaging.SubscribableDuckVersionAnnotation] = "v1"
}

ctx = apis.WithinParent(ctx, kc.ObjectMeta)
kc.Spec.SetDefaults(ctx)
}

func (kcs *KafkaChannelSpec) SetDefaults(ctx context.Context) {
if kcs.NumPartitions == 0 {
kcs.NumPartitions = DefaultNumPartitions
}
if kcs.ReplicationFactor == 0 {
kcs.ReplicationFactor = DefaultReplicationFactor
}
if len(kcs.RetentionDuration) <= 0 {
kcs.RetentionDuration = DefaultRetentionISO8601Duration
}
kcs.Delivery.SetDefaults(ctx)
}
149 changes: 149 additions & 0 deletions control-plane/pkg/apis/messaging/v1/kafka_channel_lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
"sync"

"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

// The consolidated and distributed KafkaChannel implementations require
// differentiated ConditionSets in order to accurately reflect their varied
// runtime architectures. One of the channel specific "Register..." functions
// in pkg/channel/<type>/apis/messaging/kafka_channel_lifecycle.go should be
// called via an init() in the main() of associated components.
var kc apis.ConditionSet
var channelCondSetLock = sync.RWMutex{}

// Shared / Common Conditions Used By All Channel Implementations
const (

// KafkaChannelConditionReady has status True when all sub-conditions below have been set to True.
KafkaChannelConditionReady = apis.ConditionReady

// KafkaChannelConditionAddressable has status true when this KafkaChannel meets
// the Addressable contract and has a non-empty URL.
KafkaChannelConditionAddressable apis.ConditionType = "Addressable"

// KafkaChannelConditionConfigReady has status True when the Kafka configuration to use by the channel
// exists and is valid (i.e. the connection has been established).
KafkaChannelConditionConfigReady apis.ConditionType = "ConfigurationReady"

// KafkaChannelConditionTopicReady has status True when the Kafka topic to use by the channel exists.
KafkaChannelConditionTopicReady apis.ConditionType = "TopicReady"

// KafkaChannelConditionChannelServiceReady has status True when the K8S Service representing the channel
// is ready. Because this uses ExternalName, there are no endpoints to check.
KafkaChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady"

ConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady"
)

// RegisterAlternateKafkaChannelConditionSet register a different apis.ConditionSet.
func RegisterAlternateKafkaChannelConditionSet(conditionSet apis.ConditionSet) {
channelCondSetLock.Lock()
defer channelCondSetLock.Unlock()

kc = conditionSet
}

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
func (*KafkaChannel) GetConditionSet() apis.ConditionSet {
channelCondSetLock.RLock()
defer channelCondSetLock.RUnlock()

return kc
}

// GetConditionSet retrieves the condition set for this resource.
func (*KafkaChannelStatus) GetConditionSet() apis.ConditionSet {
channelCondSetLock.RLock()
defer channelCondSetLock.RUnlock()

return kc
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (kcs *KafkaChannelStatus) GetCondition(t apis.ConditionType) *apis.Condition {
return kcs.GetConditionSet().Manage(kcs).GetCondition(t)
}

// IsReady returns true if the resource is ready overall.
func (kcs *KafkaChannelStatus) IsReady() bool {
return kcs.GetConditionSet().Manage(kcs).IsHappy()
}

// InitializeConditions sets relevant unset conditions to Unknown state.
func (kcs *KafkaChannelStatus) InitializeConditions() {
kcs.GetConditionSet().Manage(kcs).InitializeConditions()
}

// SetAddress sets the address (as part of Addressable contract) and marks the correct condition.
func (kcs *KafkaChannelStatus) SetAddress(addr *duckv1.Addressable) {
if kcs.Address == nil {
kcs.Address = &duckv1.Addressable{}
}
if addr != nil {
kcs.Address = addr
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionAddressable)
} else {
kcs.Address.URL = nil
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionAddressable, "EmptyURL", "URL is nil")
}
}

func (kcs *KafkaChannelStatus) MarkConfigTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionConfigReady)
}

func (kcs *KafkaChannelStatus) MarkConfigFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionConfigReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkTopicTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionTopicReady)
}

func (kcs *KafkaChannelStatus) MarkTopicFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionTopicReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkChannelServiceFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(KafkaChannelConditionChannelServiceReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkChannelServiceTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(KafkaChannelConditionChannelServiceReady)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesTrue() {
kcs.GetConditionSet().Manage(kcs).MarkTrue(ConditionEventPoliciesReady)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkTrueWithReason(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkFalse(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}

func (kcs *KafkaChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) {
kcs.GetConditionSet().Manage(kcs).MarkUnknown(ConditionEventPoliciesReady, reason, messageFormat, messageA...)
}
Loading

0 comments on commit ae8a755

Please sign in to comment.