diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bbd9e6482..4c9c664f3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ ### Improvements - Update to use client-go v11.0.0 (https://github.com/pulumi/pulumi-kubernetes/pull/549) +- Deduplicate provider logs (https://github.com/pulumi/pulumi-kubernetes/pull/558) ### Bug fixes diff --git a/pkg/await/apps_deployment.go b/pkg/await/apps_deployment.go index f320c2775a..89da620405 100644 --- a/pkg/await/apps_deployment.go +++ b/pkg/await/apps_deployment.go @@ -178,11 +178,12 @@ func (dia *deploymentInitAwaiter) Await() error { } defer pvcWatcher.Stop() - period := time.NewTicker(10 * time.Second) - defer period.Stop() + aggregateErrorTicker := time.NewTicker(10 * time.Second) + defer aggregateErrorTicker.Stop() timeout := time.Duration(metadata.TimeoutSeconds(dia.config.currentInputs, 5*60)) * time.Second - return dia.await(deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), period.C) + return dia.await( + deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher, time.After(timeout), aggregateErrorTicker.C) } func (dia *deploymentInitAwaiter) Read() error { @@ -275,7 +276,8 @@ func (dia *deploymentInitAwaiter) read( // await is a helper companion to `Await` designed to make it easy to test this module. func (dia *deploymentInitAwaiter) await( - deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, timeout, period <-chan time.Time, + deploymentWatcher, replicaSetWatcher, podWatcher, pvcWatcher watch.Interface, + timeout, aggregateErrorTicker <-chan time.Time, ) error { dia.config.logStatus(diag.Info, "[1/2] Waiting for app ReplicaSet be marked available") @@ -296,7 +298,7 @@ func (dia *deploymentInitAwaiter) await( object: dia.deployment, subErrors: dia.errorMessages(), } - case <-period: + case <-aggregateErrorTicker: scheduleErrors, containerErrors := dia.aggregatePodErrors() for _, message := range scheduleErrors { dia.config.logStatus(diag.Warning, message) @@ -586,7 +588,8 @@ func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() { // defined, or when all PVCs have a status of 'Bound' if phase != statusBound { allPVCsReady = false - message := fmt.Sprintf("PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase) + message := fmt.Sprintf( + "PersistentVolumeClaim: [%s] is not ready. status.phase currently at: %s", pvc.GetName(), phase) dia.config.logStatus(diag.Warning, message) } } diff --git a/pkg/await/apps_deployment_test.go b/pkg/await/apps_deployment_test.go index 0556314bfb..0882ff6268 100644 --- a/pkg/await/apps_deployment_test.go +++ b/pkg/await/apps_deployment_test.go @@ -557,8 +557,9 @@ func Test_Apps_Deployment(t *testing.T) { period := make(chan time.Time) go test.do(deployments, replicaSets, pods, timeout) - err := awaiter.await(&chanWatcher{results: deployments}, &chanWatcher{results: replicaSets}, - &chanWatcher{results: pods}, &chanWatcher{}, timeout, period) + err := awaiter.await( + &chanWatcher{results: deployments}, &chanWatcher{results: replicaSets}, &chanWatcher{results: pods}, + &chanWatcher{}, timeout, period) assert.Equal(t, test.expectedError, err, test.description) } } diff --git a/pkg/await/apps_statefulset.go b/pkg/await/apps_statefulset.go index 418ed06bde..2572d31a1b 100644 --- a/pkg/await/apps_statefulset.go +++ b/pkg/await/apps_statefulset.go @@ -153,11 +153,11 @@ func (sia *statefulsetInitAwaiter) Await() error { } defer podWatcher.Stop() - period := time.NewTicker(10 * time.Second) - defer period.Stop() + aggregateErrorTicker := time.NewTicker(10 * time.Second) + defer aggregateErrorTicker.Stop() timeout := time.Duration(metadata.TimeoutSeconds(sia.config.currentInputs, 5*60)) * time.Second - return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), period.C) + return sia.await(statefulSetWatcher, podWatcher, time.After(timeout), aggregateErrorTicker.C) } func (sia *statefulsetInitAwaiter) Read() error { @@ -219,7 +219,8 @@ func (sia *statefulsetInitAwaiter) read( // await is a helper companion to `Await` designed to make it easy to test this module. func (sia *statefulsetInitAwaiter) await( - statefulsetWatcher, podWatcher watch.Interface, timeout, period <-chan time.Time, + statefulsetWatcher, podWatcher watch.Interface, + timeout, aggregateErrorTicker <-chan time.Time, ) error { for { if sia.checkAndLogStatus() { @@ -238,7 +239,7 @@ func (sia *statefulsetInitAwaiter) await( object: sia.statefulset, subErrors: sia.errorMessages(), } - case <-period: + case <-aggregateErrorTicker: scheduleErrors, containerErrors := sia.aggregatePodErrors() for _, message := range scheduleErrors { sia.config.logStatus(diag.Warning, message) @@ -275,7 +276,8 @@ func (sia *statefulsetInitAwaiter) checkAndLogStatus() bool { sia.config.logStatus(diag.Info, fmt.Sprintf("[1/3] Waiting for StatefulSet update to roll out (%d/%d Pods ready)", sia.currentReplicas, sia.targetReplicas)) case !sia.revisionReady: - sia.config.logStatus(diag.Info, "[2/3] Waiting for StatefulSet to update .status.currentRevision") + sia.config.logStatus(diag.Info, + "[2/3] Waiting for StatefulSet to update .status.currentRevision") } } diff --git a/pkg/await/apps_statefulset_test.go b/pkg/await/apps_statefulset_test.go index 68abf8296a..00973e0b41 100644 --- a/pkg/await/apps_statefulset_test.go +++ b/pkg/await/apps_statefulset_test.go @@ -204,7 +204,8 @@ func Test_Apps_StatefulSet(t *testing.T) { period := make(chan time.Time) go test.do(statefulsets, pods, timeout) - err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period) + err := awaiter.await( + &chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period) assert.Equal(t, test.expectedError, err, test.description) } } @@ -262,7 +263,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) { awaiter.config.lastInputs = obj }) - err := awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period) + err := awaiter.await( + &chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period) assert.Equal(t, test.firstExpectedError, err, test.description) statefulsets = make(chan watch.Event) @@ -272,7 +274,8 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) { period = make(chan time.Time) go test.secondUpdate(statefulsets, pods, timeout) - err = awaiter.await(&chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period) + err = awaiter.await( + &chanWatcher{results: statefulsets}, &chanWatcher{results: pods}, timeout, period) assert.Equal(t, test.secondExpectedError, err, test.description) } } diff --git a/pkg/await/await.go b/pkg/await/await.go index dbe448b9d2..e10d024935 100644 --- a/pkg/await/await.go +++ b/pkg/await/await.go @@ -20,6 +20,7 @@ import ( "github.com/golang/glog" "github.com/pulumi/pulumi-kubernetes/pkg/clients" + "github.com/pulumi/pulumi-kubernetes/pkg/logging" "github.com/pulumi/pulumi-kubernetes/pkg/metadata" "github.com/pulumi/pulumi-kubernetes/pkg/openapi" "github.com/pulumi/pulumi-kubernetes/pkg/retry" @@ -52,7 +53,8 @@ type ProviderConfig struct { Host *pulumiprovider.HostClient URN resource.URN - ClientSet *clients.DynamicClientSet + ClientSet *clients.DynamicClientSet + DedupLogger *logging.DedupLogger } type CreateConfig struct { @@ -146,6 +148,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) { clientSet: c.ClientSet, currentInputs: c.Inputs, currentOutputs: outputs, + logger: c.DedupLogger, } waitErr := awaiter.awaitCreation(conf) if waitErr != nil { @@ -191,6 +194,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) { clientSet: c.ClientSet, currentInputs: c.Inputs, currentOutputs: outputs, + logger: c.DedupLogger, } waitErr := awaiter.awaitRead(conf) if waitErr != nil { @@ -307,6 +311,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) { clientSet: c.ClientSet, currentInputs: c.Inputs, currentOutputs: currentOutputs, + logger: c.DedupLogger, }, lastInputs: c.Previous, lastOutputs: liveOldObj, diff --git a/pkg/await/awaiters.go b/pkg/await/awaiters.go index 7f89300d7a..66a0038170 100644 --- a/pkg/await/awaiters.go +++ b/pkg/await/awaiters.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "github.com/pulumi/pulumi-kubernetes/pkg/clients" + "github.com/pulumi/pulumi-kubernetes/pkg/logging" "github.com/pulumi/pulumi-kubernetes/pkg/openapi" "github.com/pulumi/pulumi-kubernetes/pkg/watcher" "github.com/pulumi/pulumi/pkg/diag" @@ -44,15 +45,14 @@ type createAwaitConfig struct { host *provider.HostClient ctx context.Context urn resource.URN + logger *logging.DedupLogger clientSet *clients.DynamicClientSet currentInputs *unstructured.Unstructured currentOutputs *unstructured.Unstructured } func (cac *createAwaitConfig) logStatus(sev diag.Severity, message string) { - if cac.host != nil { - _ = cac.host.LogStatus(cac.ctx, sev, cac.urn, message) - } + cac.logger.LogMessage(sev, message) } // updateAwaitConfig specifies on which conditions we are to consider a resource "fully updated", diff --git a/pkg/await/core_service.go b/pkg/await/core_service.go index fbb0d775b6..482080f48a 100644 --- a/pkg/await/core_service.go +++ b/pkg/await/core_service.go @@ -206,8 +206,10 @@ func (sia *serviceInitAwaiter) read( // await is a helper companion to `Await` designed to make it easy to test this module. func (sia *serviceInitAwaiter) await( - serviceWatcher, endpointWatcher watch.Interface, timeout <-chan time.Time, - settled chan struct{}, version serverVersion, + serviceWatcher, endpointWatcher watch.Interface, + timeout <-chan time.Time, + settled chan struct{}, + version serverVersion, ) error { sia.config.logStatus(diag.Info, "[1/3] Finding Pods to direct traffic to") diff --git a/pkg/await/core_service_test.go b/pkg/await/core_service_test.go index 2c98c28930..f83c3da8b9 100644 --- a/pkg/await/core_service_test.go +++ b/pkg/await/core_service_test.go @@ -226,7 +226,8 @@ func Test_Core_Service(t *testing.T) { timeout := make(chan time.Time) go test.do(services, endpoints, settled, timeout) - err := awaiter.await(&chanWatcher{results: services}, &chanWatcher{results: endpoints}, + err := awaiter.await( + &chanWatcher{results: services}, &chanWatcher{results: endpoints}, timeout, settled, test.version) assert.Equal(t, test.expectedError, err, test.description) } diff --git a/pkg/await/extensions_ingress.go b/pkg/await/extensions_ingress.go index 3c8122b7ba..8b35ce2c51 100644 --- a/pkg/await/extensions_ingress.go +++ b/pkg/await/extensions_ingress.go @@ -186,8 +186,11 @@ func (iia *ingressInitAwaiter) read(ingress *unstructured.Unstructured, endpoint } // await is a helper companion to `Await` designed to make it easy to test this module. -func (iia *ingressInitAwaiter) await(ingressWatcher, serviceWatcher, endpointWatcher watch.Interface, - settled chan struct{}, timeout <-chan time.Time) error { +func (iia *ingressInitAwaiter) await( + ingressWatcher, serviceWatcher, endpointWatcher watch.Interface, + settled chan struct{}, + timeout <-chan time.Time, +) error { iia.config.logStatus(diag.Info, "[1/3] Finding a matching service for each Ingress path") for { diff --git a/pkg/await/extensions_ingress_test.go b/pkg/await/extensions_ingress_test.go index 2c59c897b7..a377f4f1ff 100644 --- a/pkg/await/extensions_ingress_test.go +++ b/pkg/await/extensions_ingress_test.go @@ -127,7 +127,9 @@ func Test_Extensions_Ingress(t *testing.T) { timeout := make(chan time.Time) go test.do(ingresses, services, endpoints, settled, timeout) - err := awaiter.await(&chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints}, settled, timeout) + err := awaiter.await( + &chanWatcher{results: ingresses}, &chanWatcher{results: services}, &chanWatcher{results: endpoints}, + settled, timeout) assert.Equal(t, test.expectedError, err, test.description) } } diff --git a/pkg/await/util_test.go b/pkg/await/util_test.go index 79bfa44b1b..1ca1218cc7 100644 --- a/pkg/await/util_test.go +++ b/pkg/await/util_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/pulumi/pulumi-kubernetes/pkg/logging" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -13,6 +14,7 @@ func mockAwaitConfig(inputs *unstructured.Unstructured) createAwaitConfig { //TODO: complete this mock if needed currentInputs: inputs, currentOutputs: inputs, + logger: logging.NewLogger(context.Background(), nil, ""), } } diff --git a/pkg/logging/dedup_logger.go b/pkg/logging/dedup_logger.go new file mode 100644 index 0000000000..52cbaea0c2 --- /dev/null +++ b/pkg/logging/dedup_logger.go @@ -0,0 +1,78 @@ +// Copyright 2016-2019, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "context" + "sync" + + "github.com/pulumi/pulumi/pkg/diag" + "github.com/pulumi/pulumi/pkg/resource" + "github.com/pulumi/pulumi/pkg/resource/provider" +) + +// DedupLogger wraps a time-ordered log set to allow batched logging of unique messages. +// Operations on DedupLogger are safe to use concurrently. +type DedupLogger struct { + messages *TimeOrderedLogSet + index int + ctx context.Context + host *provider.HostClient + urn resource.URN + mux sync.Mutex +} + +// NewLogger returns an initialized DedupLogger. +func NewLogger(ctx context.Context, host *provider.HostClient, urn resource.URN) *DedupLogger { + return &DedupLogger{ + messages: &TimeOrderedLogSet{}, + ctx: ctx, + host: host, + urn: urn, + } +} + +// LogMessage adds a message to the log set and flushes the queue to the host. +func (l *DedupLogger) LogMessage(severity diag.Severity, s string) { + l.EnqueueMessage(severity, s) + l.LogNewMessages() +} + +// EnqueueMessage adds a message to the log set but does not log it to the host. +func (l *DedupLogger) EnqueueMessage(severity diag.Severity, s string) { + l.mux.Lock() + defer l.mux.Unlock() + + l.messages.Add(Message{s, severity}) +} + +// GetNewMessages returns the list of new messages since last calling GetNewMessages. +func (l *DedupLogger) GetNewMessages() []Message { + l.mux.Lock() + defer l.mux.Unlock() + + idx := l.index + l.index = len(l.messages.Messages) + return l.messages.Messages[idx:] +} + +// LogNewMessages logs any new messages to the host. +func (l *DedupLogger) LogNewMessages() { + if l.host != nil { + for _, msg := range l.GetNewMessages() { + _ = l.host.LogStatus(l.ctx, msg.severity, l.urn, msg.s) + } + } +} diff --git a/pkg/logging/time_ordered_log_set.go b/pkg/logging/time_ordered_log_set.go new file mode 100644 index 0000000000..c36f07dcf2 --- /dev/null +++ b/pkg/logging/time_ordered_log_set.go @@ -0,0 +1,53 @@ +// Copyright 2016-2019, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "fmt" + + "github.com/pulumi/pulumi/pkg/diag" +) + +// Message stores a log string and the severity for the log message. +type Message struct { + s string + severity diag.Severity +} + +func (m Message) String() string { + return fmt.Sprintf("[%s] %s", m.severity, m.s) +} + +// TimeOrderedLogSet stores a temporally-ordered set of log messages. +type TimeOrderedLogSet struct { + exists map[Message]bool + Messages []Message +} + +// Add appends a message to the time-ordered set. +func (o *TimeOrderedLogSet) Add(msg Message) { + // Ensure memory has been allocated. + if o.exists == nil { + o.exists = make(map[Message]bool) + } + if o.Messages == nil { + o.Messages = []Message{} + } + + if !o.exists[msg] { + o.Messages = append(o.Messages, msg) + o.exists[msg] = true + } +} diff --git a/pkg/logging/time_ordered_log_set_test.go b/pkg/logging/time_ordered_log_set_test.go new file mode 100644 index 0000000000..b665ae7d5c --- /dev/null +++ b/pkg/logging/time_ordered_log_set_test.go @@ -0,0 +1,83 @@ +// Copyright 2016-2019, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package logging + +import ( + "testing" + + "github.com/pulumi/pulumi/pkg/diag" + "github.com/stretchr/testify/assert" +) + +func TestOrderedStringSet_Add(t *testing.T) { + status1 := Message{"foo", diag.Info} + status2 := Message{"bar", diag.Info} + warn1 := Message{"boom", diag.Warning} + + type fields struct { + exists map[Message]bool + Messages []Message + } + type args struct { + msg Message + } + tests := []struct { + name string + fields fields + args args + expect []Message + }{ + { + "add a message to uninitialized struct", + fields{}, + args{status1}, + []Message{status1}, + }, + { + "add a new message to empty list", + fields{map[Message]bool{}, []Message{}}, + args{status1}, + []Message{status1}, + }, + { + "add a new info message to existing list", + fields{map[Message]bool{status1: true}, []Message{status1}}, + args{status2}, + []Message{status1, status2}, + }, + { + "add a new warning message to existing list", + fields{map[Message]bool{status1: true}, []Message{status1}}, + args{warn1}, + []Message{status1, warn1}, + }, + { + "add a duplicate string", + fields{map[Message]bool{status1: true}, []Message{status1}}, + args{status1}, + []Message{status1}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + o := &TimeOrderedLogSet{ + exists: tt.fields.exists, + Messages: tt.fields.Messages, + } + o.Add(tt.args.msg) + assert.ObjectsAreEqual(o.Messages, tt.expect) + }) + } +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 7fe1121c8b..259eb83ff2 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -27,6 +27,7 @@ import ( "github.com/grpc/grpc-go/status" "github.com/pulumi/pulumi-kubernetes/pkg/await" "github.com/pulumi/pulumi-kubernetes/pkg/clients" + "github.com/pulumi/pulumi-kubernetes/pkg/logging" "github.com/pulumi/pulumi-kubernetes/pkg/metadata" "github.com/pulumi/pulumi-kubernetes/pkg/openapi" "github.com/pulumi/pulumi/pkg/resource" @@ -480,10 +481,11 @@ func (k *kubeProvider) Create( config := await.CreateConfig{ ProviderConfig: await.ProviderConfig{ - Context: k.canceler.context, - Host: k.host, - URN: resource.URN(req.GetUrn()), - ClientSet: k.clientSet, + Context: k.canceler.context, + Host: k.host, + URN: urn, + ClientSet: k.clientSet, + DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn), }, Inputs: newInputs, } @@ -581,10 +583,11 @@ func (k *kubeProvider) Read(ctx context.Context, req *pulumirpc.ReadRequest) (*p config := await.ReadConfig{ ProviderConfig: await.ProviderConfig{ - Context: k.canceler.context, - Host: k.host, - URN: resource.URN(req.GetUrn()), - ClientSet: k.clientSet, + Context: k.canceler.context, + Host: k.host, + URN: urn, + ClientSet: k.clientSet, + DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn), }, Inputs: oldInputs, Name: name, @@ -736,10 +739,11 @@ func (k *kubeProvider) Update( config := await.UpdateConfig{ ProviderConfig: await.ProviderConfig{ - Context: k.canceler.context, - Host: k.host, - URN: resource.URN(req.GetUrn()), - ClientSet: k.clientSet, + Context: k.canceler.context, + Host: k.host, + URN: urn, + ClientSet: k.clientSet, + DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn), }, Previous: oldInputs, Inputs: newInputs, @@ -803,14 +807,15 @@ func (k *kubeProvider) Delete( return nil, err } _, current := parseCheckpointObject(oldState) - _, name := ParseFqName(req.GetId()) + config := await.DeleteConfig{ ProviderConfig: await.ProviderConfig{ - Context: k.canceler.context, - Host: k.host, - URN: resource.URN(req.GetUrn()), - ClientSet: k.clientSet, + Context: k.canceler.context, // TODO: should this just be ctx from the args? + Host: k.host, + URN: urn, + ClientSet: k.clientSet, + DedupLogger: logging.NewLogger(k.canceler.context, k.host, urn), }, Inputs: current, Name: name,