Skip to content

Commit

Permalink
addressing comments, Part1
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wessendorf <[email protected]>
  • Loading branch information
matzew committed Oct 31, 2024
1 parent ee8d39d commit 05417aa
Show file tree
Hide file tree
Showing 16 changed files with 581 additions and 104 deletions.
48 changes: 2 additions & 46 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -7368,27 +7368,6 @@ and modifications of the event sent to the sink.</p>
</tr>
<tr>
<td>
<code>properties</code><br/>
<em>
map[string]string
</em>
</td>
<td>
<p>Properties are a key/value properties</p>
</td>
</tr>
<tr>
<td>
<code>type</code><br/>
<em>
string
</em>
</td>
<td>
</td>
</tr>
<tr>
<td>
<code>aws</code><br/>
<em>
<a href="#sources.knative.dev/v1alpha1.Aws">
Expand Down Expand Up @@ -7463,8 +7442,7 @@ string
</em>
</td>
<td>
<p>UseDefaultCredentials bool <code>json:&quot;useDefaultCredentials&quot; default:&quot;false&quot;</code> // Use default credentials provider
UseProfileCredentials bool <code>json:&quot;useProfileCredentials&quot; default:&quot;false&quot;</code> // Use profile credentials provider</p>
<p>AWS region</p>
</td>
</tr>
<tr>
Expand All @@ -7475,8 +7453,7 @@ string
</em>
</td>
<td>
<pre><code>UseSessionCredentials bool `json:&quot;useSessionCredentials&quot; default:&quot;false&quot;` // Use session credentials
</code></pre>
<p>Profile name for profile credentials provider</p>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -8043,27 +8020,6 @@ and modifications of the event sent to the sink.</p>
</tr>
<tr>
<td>
<code>properties</code><br/>
<em>
map[string]string
</em>
</td>
<td>
<p>Properties are a key/value properties</p>
</td>
</tr>
<tr>
<td>
<code>type</code><br/>
<em>
string
</em>
</td>
<td>
</td>
</tr>
<tr>
<td>
<code>aws</code><br/>
<em>
<a href="#sources.knative.dev/v1alpha1.Aws">
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/sources/v1alpha1/integration_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (iss *IntegrationSourceStatus) IsReady() bool {
}

func (s *IntegrationSourceStatus) PropagateContainerSourceStatus(status *v1.ContainerSourceStatus) {
// Do not copy conditions nor observedGeneration
//// Do not copy conditions nor observedGeneration
s.SourceStatus = *status.SourceStatus.DeepCopy()

cond := status.GetCondition(apis.ConditionReady)
Expand All @@ -71,6 +71,6 @@ func (s *IntegrationSourceStatus) PropagateContainerSourceStatus(status *v1.Cont
IntegrationCondSet.Manage(s).MarkUnknown(IntegrationSourceConditionContainerSourceReady, cond.Reason, cond.Message)
}

// Propagate SinkBindings AuthStatus to containersources AuthStatus
// Propagate ContainerSources AuthStatus to IntegrationSources AuthStatus
s.Auth = status.Auth
}
15 changes: 4 additions & 11 deletions pkg/apis/sources/v1alpha1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ type IntegrationSourceSpec struct {
// and modifications of the event sent to the sink.
duckv1.SourceSpec `json:",inline"`

// Properties are a key/value properties
Properties map[string]string `json:"properties"`
Type string `json:"type,omitempty"`

Aws *Aws `json:"aws,omitempty"` // AWS source configuration
Timer *Timer `json:"timer,omitempty"` // Timer configuration
}
Expand All @@ -74,14 +70,11 @@ type Timer struct {

type AWSCommon struct {
// Auth is the S3 authentication (accessKey/secretKey) configuration.
Region string `json:"region,omitempty"` // AWS region
//UseDefaultCredentials bool `json:"useDefaultCredentials" default:"false"` // Use default credentials provider
//UseProfileCredentials bool `json:"useProfileCredentials" default:"false"` // Use profile credentials provider
Region string `json:"region,omitempty"` // AWS region
ProfileCredentialsName string `json:"profileCredentialsName,omitempty"` // Profile name for profile credentials provider
// UseSessionCredentials bool `json:"useSessionCredentials" default:"false"` // Use session credentials
SessionToken string `json:"sessionToken,omitempty"` // Session token
URIEndpointOverride string `json:"uriEndpointOverride,omitempty"` // Override endpoint URI
OverrideEndpoint bool `json:"overrideEndpoint" default:"false"` // Override endpoint flag
SessionToken string `json:"sessionToken,omitempty"` // Session token
URIEndpointOverride string `json:"uriEndpointOverride,omitempty"` // Override endpoint URI
OverrideEndpoint bool `json:"overrideEndpoint" default:"false"` // Override endpoint flag
}

type AWSS3 struct {
Expand Down
7 changes: 0 additions & 7 deletions pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/reconciler/containersource/containersource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func TestAllCases(t *testing.T) {
},
Key: testNS + "/" + sourceName,
WantEvents: []string{
Eventf(corev1.EventTypeNormal, sourceReconciled, `ContainerSource reconciled: "%s/%s"`, testNS, sourceName),
Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "%s/%s"`, testNS, sourceName),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewContainerSource(sourceName, testNS,
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/integrationsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
containersourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/containersource"

configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/system"
Expand All @@ -33,7 +34,7 @@ import (
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis/feature"
v1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
integrationsourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/integrationsource"
v1integrationsource "knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha1/integrationsource"
Expand Down Expand Up @@ -67,7 +68,6 @@ func NewController(
eventingClientSet: eventingClient,
containerSourceLister: containerSourceInformer.Lister(),
integrationSourceLister: integrationsourceInformer.Lister(),
//trustBundleConfigMapLister: trustBundleConfigMapInformer.Lister(),
}

impl := v1integrationsource.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
Expand Down
60 changes: 60 additions & 0 deletions pkg/reconciler/integrationsource/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2024 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integrationsource

import (
"context"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/informers/sources/v1/containersource/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/integrationsource/fake"
"knative.dev/eventing/pkg/eventingtls"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake"
_ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake"
_ "knative.dev/pkg/injection/clients/dynamicclient/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t, SetUpInformerSelector)

c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
},
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
}
}

func SetUpInformerSelector(ctx context.Context) context.Context {
ctx = filteredFactory.WithSelectors(ctx, eventingtls.TrustBundleLabelSelector)
return ctx
}
19 changes: 13 additions & 6 deletions pkg/reconciler/integrationsource/integrationsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

const (
// Name of the corev1.Events emitted from the reconciliation process
sourceReconciled = "IntegrationSourceReconciled"
containerSourceCreated = "ContainerSourceCreated"
containerSourceUpdated = "ContainerSourceUpdated"
)
Expand All @@ -40,6 +41,12 @@ type Reconciler struct {
// Check that our Reconciler implements Interface
var _ integrationsource.Interface = (*Reconciler)(nil)

// newReconciledNormal makes a new reconciler event with event type Normal, and
// reason ContainerSourceReconciled.
func newReconciledNormal(namespace, name string) pkgreconciler.Event {
return pkgreconciler.NewEvent(corev1.EventTypeNormal, sourceReconciled, "IntegrationSource reconciled: \"%s/%s\"", namespace, name)
}

func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.IntegrationSource) pkgreconciler.Event {

_, err := r.reconcileContainerSource(ctx, source)
Expand All @@ -48,7 +55,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha1.Integra
return err
}

return nil
return newReconciledNormal(source.Namespace, source.Name)
}

func (r *Reconciler) reconcileContainerSource(ctx context.Context, source *v1alpha1.IntegrationSource) (*v1.ContainerSource, error) {
Expand All @@ -64,22 +71,22 @@ func (r *Reconciler) reconcileContainerSource(ctx context.Context, source *v1alp
} else if err != nil {
return nil, fmt.Errorf("getting ContainerSource: %v", err)
} else if !metav1.IsControlledBy(cs, source) {
return nil, fmt.Errorf("ContainerSource %q is not owned by KameletSource %q", cs.Name, source.Name)
} else if r.podSpecChanged(&cs.Spec.Template.Spec, &expected.Spec.Template.Spec) {
cs.Spec.Template.Spec = expected.Spec.Template.Spec
return nil, fmt.Errorf("ContainerSource %q is not owned by IntegrationSource %q", cs.Name, source.Name)
} else if r.containerSourceSpecChanged(&cs.Spec, &expected.Spec) {
cs.Spec = expected.Spec
cs, err = r.eventingClientSet.SourcesV1().ContainerSources(source.Namespace).Update(ctx, cs, metav1.UpdateOptions{})
if err != nil {
return nil, fmt.Errorf("updating ContainerSource: %v", err)
}
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeNormal, containerSourceUpdated, "ContainerSource updated %q", cs.Name)
} else {
logging.FromContext(ctx).Debugw("Reusing existing ContainerSource", zap.Any("ContainerSource", cs))
logging.FromContext(ctx).Debugw("Reusing existing ContainerSource", zap.Any("ContainerSource", cs.ObjectMeta))
}

source.Status.PropagateContainerSourceStatus(&cs.Status)
return cs, nil
}

func (r *Reconciler) podSpecChanged(have *corev1.PodSpec, want *corev1.PodSpec) bool {
func (r *Reconciler) containerSourceSpecChanged(have *v1.ContainerSourceSpec, want *v1.ContainerSourceSpec) bool {
return !equality.Semantic.DeepDerivative(want, have)
}
Loading

0 comments on commit 05417aa

Please sign in to comment.