Skip to content

Commit

Permalink
Merge pull request #55 from radiofrance/fix-watch-panic
Browse files Browse the repository at this point in the history
fix(goss): fixed panic on pod failure sending to closed channel
  • Loading branch information
graillus authored Feb 4, 2022
2 parents e576e25 + 6695c33 commit 4ffbf03
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 28 deletions.
25 changes: 19 additions & 6 deletions goss/executor_kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,20 @@ func (e KubernetesExecutor) Execute(ctx context.Context, output io.Writer, opts
return err
}

labelSelector := fmt.Sprintf("app.kubernetes.io/instance=%s", pod.Name)
readyChan, errChan, err := k8sutils.WaitPodReady(ctx, e.clientSet, e.PodConfig.Namespace, labelSelector)
watcher, err := e.clientSet.CoreV1().Pods(e.PodConfig.Namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/instance=%s", pod.Name),
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to watch pod: %w", err)
}
defer watcher.Stop()

readyChan, watchErrChan := k8sutils.WaitPodReady(ctx, watcher)

errChan := make(chan error)
go func() {
defer close(errChan)
<-readyChan
go k8sutils.PrintPodLogs(ctx, output, e.clientSet, e.PodConfig.Namespace, podName, containerName)

Expand Down Expand Up @@ -187,10 +195,15 @@ func (e KubernetesExecutor) Execute(ctx context.Context, output io.Writer, opts
_ = e.clientSet.CoreV1().Pods(e.PodConfig.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
}()

err = <-errChan
close(errChan)
if err != nil {
return fmt.Errorf("error running goss tests: %w", err)
select {
case watchErr := <-watchErrChan:
if watchErr != nil {
return fmt.Errorf("error watching goss pod: %w", watchErr)
}
case err = <-errChan:
if err != nil {
return fmt.Errorf("error running goss tests: %w", err)
}
}
return nil
}
21 changes: 15 additions & 6 deletions kaniko/executor_kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,16 @@ func (e KubernetesExecutor) Execute(ctx context.Context, output io.Writer, args
return err
}

labelSelector := fmt.Sprintf("app.kubernetes.io/instance=%s", pod.Name)
readyChan, errChan, err := k8sutils.WaitPodReady(ctx, e.clientSet, e.PodConfig.Namespace, labelSelector)
watcher, err := e.clientSet.CoreV1().Pods(e.PodConfig.Namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("app.kubernetes.io/instance=%s", pod.Name),
Watch: true,
})
if err != nil {
return fmt.Errorf("failed to watch pod: %w", err)
}
defer watcher.Stop()

readyChan, errChan := k8sutils.WaitPodReady(ctx, watcher)
go func() {
<-readyChan
k8sutils.PrintPodLogs(ctx, output, e.clientSet, e.PodConfig.Namespace, podName, containerName)
Expand All @@ -197,13 +201,18 @@ func (e KubernetesExecutor) Execute(ctx context.Context, output io.Writer, args
if err != nil {
return fmt.Errorf("failed to create kaniko pod: %w", err)
}
defer func() {
err := e.clientSet.CoreV1().Pods(e.PodConfig.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
logrus.Warnf("Failed to delete kaniko pod %s, ignoring: %v", pod.Name, err)
}
}()

err = <-errChan
delErr := e.clientSet.CoreV1().Pods(e.PodConfig.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if delErr != nil {
logrus.Warnf("failed to delete kaniko pod %s, ignoring: %v", pod.Name, delErr)
if err != nil {
return fmt.Errorf("error watching kaniko pod: %w", err)
}
return err
return nil
}

// UniquePodName generates a unique pod name with random characters.
Expand Down
23 changes: 7 additions & 16 deletions kubernetes/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,30 @@ import (

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)

// WaitPodReady waits for a pod to be in running state.
// The function is non-blocking, it returns 2 channels that will used as event dispatchers:
// The function is non-blocking, it returns 2 channels that will be used as event dispatchers:
// - When the pod reaches the running state, an empty struct is sent to readyChan.
// - When the pod reached completion, nil is sent to errChan on success, or an error if the pod failed.
// - If the 1-hour timeout is reached, an error is sent to errChan.
// - If the passed context is cancelled or timeouts, an error is sent to errChan.
func WaitPodReady(ctx context.Context, clientSet kubernetes.Interface, namespace string, labelSelector string,
) (readyChan chan struct{}, errChan chan error, err error) {
watcher, err := clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
Watch: true,
})
if err != nil {
return readyChan, errChan, fmt.Errorf("failed to watch pod: %w", err)
}

func WaitPodReady(ctx context.Context, watcher watch.Interface) (readyChan chan struct{}, errChan chan error) {
readyChan = make(chan struct{})
errChan = make(chan error)
running := false

go func() {
defer watcher.Stop()
defer close(errChan)
defer close(readyChan)
for {
select {
case event, chanOk := <-watcher.ResultChan():
if !chanOk {
errChan <- fmt.Errorf("watcher channel was closed")
return
}
pod, ok := event.Object.(*corev1.Pod)
Expand Down Expand Up @@ -73,13 +64,13 @@ func WaitPodReady(ctx context.Context, clientSet kubernetes.Interface, namespace
errChan <- fmt.Errorf("timeout waiting for pod to run to completion")
return
case <-ctx.Done():
errChan <- fmt.Errorf("context was cancelled: %w", ctx.Err())
errChan <- fmt.Errorf("stop wating for pod: %w", ctx.Err())
return
}
}
}()

return readyChan, errChan, nil
return readyChan, errChan
}

// PrintPodLogs watches the logs a container in a pod and writes them to the giver io.Writer.
Expand Down

0 comments on commit 4ffbf03

Please sign in to comment.