Skip to content

Commit

Permalink
Checks that the expected services are running on bootstrap / join
Browse files Browse the repository at this point in the history
Currently, we're doing only a few checks that would ensure a succesful
installation of the k8s-snap (the containerd path is free for us to use,
the ports meant for Kubernetes services are free, binaries are sane).

However, due to various reasons, the bootstrap / join process can still
succeed, even if some of the services are not running (e.g.: invalid
extra service arguments given in the bootstrap config, leading to a
service to fail due to an unknown or invalid argument). At the moment,
we're only verifying that kube-apiserver is accessible.

We're adding a few checks that the Kubernetes services are not stopped /
failed after we've started them.

Additionally, if the bootstrap / join process fails, we're not doing any
clean ups, meaning that the services keep running, and the next
bootstrap / join attempt will automatically fail because of the
previously mentioned checks. This also addresses this issue.
  • Loading branch information
claudiubelu committed Dec 10, 2024
1 parent 60f9276 commit 58f3985
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 5 deletions.
2 changes: 1 addition & 1 deletion k8s/lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ k8s::common::execute_service() {

k8s::common::setup_env

set -xe
# Source arguments and substitute environment variables. Will fail if we cannot read the file.
declare -a args="($(cat "${SNAP_COMMON}/args/${service_name}"))"

set -xe
exec "${SNAP}/bin/${service_name}" "${args[@]}"
}

Expand Down
16 changes: 16 additions & 0 deletions src/k8s/pkg/k8sd/app/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"fmt"
"net"
"time"

"github.com/canonical/k8s/pkg/k8sd/setup"
"github.com/canonical/k8s/pkg/snap"
snaputil "github.com/canonical/k8s/pkg/snap/util"
"github.com/canonical/k8s/pkg/utils/control"
mctypes "github.com/canonical/microcluster/v2/rest/types"
)

Expand Down Expand Up @@ -61,6 +63,20 @@ func waitApiServerReady(ctx context.Context, snap snap.Snap) error {
return nil
}

func waitControlPlaneServices(ctx context.Context, snap snap.Snap) error {
// The services may be able to start, appearing to be "active", but they might eventually fail due to
// various reasons, and they may be restarted. We're checking their activity a few times.
if err := control.Consistently(ctx, 3, 5*time.Second, func() error {
if err := snaputil.CheckControlPlaneServicesStates(ctx, snap, "active"); err != nil {
return fmt.Errorf("failed to ensure all control plane services are active: %w", err)
}
return nil
}); err != nil {
return fmt.Errorf("failed after retry: %w", err)
}
return nil
}

func DetermineLocalhostAddress(clusterMembers []mctypes.ClusterMember) (string, error) {
// Check if any of the cluster members have an IPv6 address, if so return "::1"
// if one member has an IPv6 address, other members should also have IPv6 interfaces
Expand Down
56 changes: 55 additions & 1 deletion src/k8s/pkg/k8sd/app/hooks_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
"github.com/canonical/k8s/pkg/utils/experimental/snapdconfig"
"github.com/canonical/lxd/shared/revert"
"github.com/canonical/microcluster/v2/state"
)

Expand Down Expand Up @@ -250,10 +252,20 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
return fmt.Errorf("database transaction to set cluster configuration failed: %w", err)
}

reverter := revert.New()
defer reverter.Fail()

// Worker node services
if err := setup.Containerd(snap, joinConfig.ExtraNodeContainerdConfig, joinConfig.ExtraNodeContainerdArgs); err != nil {
return fmt.Errorf("failed to configure containerd: %w", err)
}
reverter.Add(func() {
// We need to clean this up, so the second join attempt won't fail because this folder exists.
if err := os.RemoveAll(snap.ContainerdSocketDir()); err != nil {
log.Error(err, "Could not remove containerd socket path")
}
})

if err := setup.KubeletWorker(snap, s.Name(), nodeIP, response.ClusterDNS, response.ClusterDomain, response.CloudProvider, joinConfig.ExtraNodeKubeletArgs); err != nil {
return fmt.Errorf("failed to configure kubelet: %w", err)
}
Expand Down Expand Up @@ -283,6 +295,26 @@ func (a *App) onBootstrapWorkerNode(ctx context.Context, s state.State, encodedT
return fmt.Errorf("failed after retry: %w", err)
}

log.Info("Checking worker node services")
// The services may be able to start, appearing to be "active", but they might eventually fail due to
// various reasons, and they may be restarted. We're checking their activity a few times.
if err := control.Consistently(ctx, 3, 5*time.Second, func() error {
if err := snaputil.CheckWorkerServicesStates(ctx, snap, "active"); err != nil {
return fmt.Errorf("failed to ensure all worker services are active: %w", err)
}
return nil
}); err != nil {
log.Error(err, "Not all worker node services entered an active state. Stopping worker node services.")
reverter.Add(func() {
if err := snaputil.StopWorkerServices(ctx, snap); err != nil {
log.Error(err, "Could not stop all worker node services")
}
})
return err
}

reverter.Success()
log.Info("Worker node services are ready")
return nil
}

Expand Down Expand Up @@ -453,10 +485,20 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
return fmt.Errorf("unsupported datastore %s, must be one of %v", cfg.Datastore.GetType(), setup.SupportedDatastores)
}

reverter := revert.New()
defer reverter.Fail()

// Configure services
if err := setup.Containerd(snap, bootstrapConfig.ExtraNodeContainerdConfig, bootstrapConfig.ExtraNodeContainerdArgs); err != nil {
return fmt.Errorf("failed to configure containerd: %w", err)
}
reverter.Add(func() {
// We need to clean this up, so the second bootstrap attempt won't fail because this folder exists.
if err := os.RemoveAll(snap.ContainerdSocketDir()); err != nil {
log.Error(err, "Could not remove containerd socket path")
}
})

if err := setup.KubeletControlPlane(snap, s.Name(), nodeIP, cfg.Kubelet.GetClusterDNS(), cfg.Kubelet.GetClusterDomain(), cfg.Kubelet.GetCloudProvider(), cfg.Kubelet.GetControlPlaneTaints(), bootstrapConfig.ExtraNodeKubeletArgs); err != nil {
return fmt.Errorf("failed to configure kubelet: %w", err)
}
Expand Down Expand Up @@ -508,8 +550,19 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
if err := waitApiServerReady(ctx, snap); err != nil {
return fmt.Errorf("kube-apiserver did not become ready in time: %w", err)
}
log.Info("API server is ready - notify controllers")

log.Info("API server is ready - waiting for control plane services")
if err := waitControlPlaneServices(ctx, snap); err != nil {
log.Error(err, "Not all control plane services entered an active state. Stopping control plane services.")
reverter.Add(func() {
if err := stopControlPlaneServices(ctx, snap, cfg.Datastore.GetType()); err != nil {
log.Error(err, "Could not stop all control plane node services")
}
})
return err
}

log.Info("Control plane services are ready - notify controllers")
a.NotifyFeatureController(
cfg.Network.GetEnabled(),
cfg.Gateway.GetEnabled(),
Expand All @@ -520,5 +573,6 @@ func (a *App) onBootstrapControlPlane(ctx context.Context, s state.State, bootst
cfg.DNS.GetEnabled(),
)
a.NotifyUpdateNodeConfigController()
reverter.Success()
return nil
}
24 changes: 24 additions & 0 deletions src/k8s/pkg/k8sd/app/hooks_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"os"
"time"

databaseutil "github.com/canonical/k8s/pkg/k8sd/database/util"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/canonical/k8s/pkg/utils"
"github.com/canonical/k8s/pkg/utils/control"
"github.com/canonical/k8s/pkg/utils/experimental/snapdconfig"
"github.com/canonical/lxd/shared/revert"
"github.com/canonical/microcluster/v2/state"
)

Expand Down Expand Up @@ -191,10 +193,19 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri
return fmt.Errorf("unsupported datastore %s, must be one of %v", cfg.Datastore.GetType(), setup.SupportedDatastores)
}

reverter := revert.New()
defer reverter.Fail()

// Configure services
if err := setup.Containerd(snap, joinConfig.ExtraNodeContainerdConfig, joinConfig.ExtraNodeContainerdArgs); err != nil {
return fmt.Errorf("failed to configure containerd: %w", err)
}
reverter.Add(func() {
// We need to clean this up, so the second join attempt won't fail because this folder exists.
if err := os.RemoveAll(snap.ContainerdSocketDir()); err != nil {
log.Error(err, "Could not remove containerd socket path")
}
})
if err := setup.KubeletControlPlane(snap, s.Name(), nodeIP, cfg.Kubelet.GetClusterDNS(), cfg.Kubelet.GetClusterDomain(), cfg.Kubelet.GetCloudProvider(), cfg.Kubelet.GetControlPlaneTaints(), joinConfig.ExtraNodeKubeletArgs); err != nil {
return fmt.Errorf("failed to configure kubelet: %w", err)
}
Expand Down Expand Up @@ -236,5 +247,18 @@ func (a *App) onPostJoin(ctx context.Context, s state.State, initConfig map[stri
return fmt.Errorf("failed to wait for kube-apiserver to become ready: %w", err)
}

log.Info("API server is ready - waiting for control plane services")
if err := waitControlPlaneServices(ctx, snap); err != nil {
log.Error(err, "Not all control plane services entered an active state. Stopping control plane services.")
reverter.Add(func() {
if err := stopControlPlaneServices(ctx, snap, cfg.Datastore.GetType()); err != nil {
log.Error(err, "Could not stop all control plane node services")
}
})
return err
}

reverter.Success()
log.Info("Control plane node services are ready")
return nil
}
7 changes: 4 additions & 3 deletions src/k8s/pkg/snap/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ type Snap interface {
GID() int // GID is the group ID to set on config files.
Hostname() string // Hostname is the name of the node.

StartService(ctx context.Context, serviceName string) error // snapctl start $service
StopService(ctx context.Context, serviceName string) error // snapctl stop $service
RestartService(ctx context.Context, serviceName string) error // snapctl restart $service
StartService(ctx context.Context, serviceName string) error // snapctl start $service
StopService(ctx context.Context, serviceName string) error // snapctl stop $service
RestartService(ctx context.Context, serviceName string) error // snapctl restart $service
GetServiceState(ctx context.Context, serviceName string) (string, error) // systemctl is-running $service

SnapctlGet(ctx context.Context, args ...string) ([]byte, error) // snapctl get $args...
SnapctlSet(ctx context.Context, args ...string) error // snapctl set $args...
Expand Down
4 changes: 4 additions & 0 deletions src/k8s/pkg/snap/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (s *Snap) RestartService(ctx context.Context, name string) error {
return s.RestartServiceErr
}

func (s *Snap) GetServiceState(ctx context.Context, name string) (string, error) {
return "active", nil
}

func (s *Snap) Refresh(ctx context.Context, opts types.RefreshOpts) (string, error) {
if len(s.RefreshCalledWith) == 0 {
s.RefreshCalledWith = []types.RefreshOpts{opts}
Expand Down
32 changes: 32 additions & 0 deletions src/k8s/pkg/snap/pebble.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package snap

import (
"bytes"
"context"
"fmt"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/canonical/k8s/pkg/k8sd/types"
"github.com/canonical/k8s/pkg/log"
"github.com/canonical/k8s/pkg/utils"
)

const (
stateActive = "active"
stateInactive = "inactive"
)

type PebbleOpts struct {
SnapDir string
SnapCommonDir string
Expand Down Expand Up @@ -66,6 +73,31 @@ func (s *pebble) RestartService(ctx context.Context, name string) error {
return s.runCommand(ctx, []string{filepath.Join(s.snapDir, "bin", "pebble"), "restart", name})
}

// GetServiceState returns a k8s service state. The name can be either prefixed or not.
func (s *pebble) GetServiceState(ctx context.Context, name string) (string, error) {
var b bytes.Buffer
err := s.runCommand(ctx, []string{filepath.Join(s.snapDir, "bin", "pebble"), "services", name}, func(c *exec.Cmd) { c.Stdout = &b })
if err != nil {
return "", err
}

output := b.String()
// We're expecting output like this:
// Service Startup Current Since
// kubelet enabled inactive -
lines := strings.Split(output, "\n")
if len(lines) < 2 {
return "", fmt.Errorf("Unexpected output when checking service %s state", name)
}

fields := strings.Fields(lines[1])
if len(fields) < 3 || (!strings.EqualFold(stateActive, fields[2]) && !strings.EqualFold(stateInactive, fields[2])) {
return "", fmt.Errorf("Unexpected output when checking service %s state", name)
}

return fields[2], nil
}

func (s *pebble) Refresh(ctx context.Context, to types.RefreshOpts) (string, error) {
switch {
case to.Revision != "":
Expand Down
8 changes: 8 additions & 0 deletions src/k8s/pkg/snap/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ func serviceName(serviceName string) string {
}
return fmt.Sprintf("k8s.%s", serviceName)
}

// systemdServiceName infers the name of the systemd service from the service name.
func systemdServiceName(serviceName string) string {
if strings.HasPrefix(serviceName, "snap.k8s.") {
return serviceName
}
return fmt.Sprintf("snap.k8s.%s", serviceName)
}
18 changes: 18 additions & 0 deletions src/k8s/pkg/snap/snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ func (s *snap) RestartService(ctx context.Context, name string) error {
return s.runCommand(ctx, []string{"snapctl", "restart", serviceName(name)})
}

// GetServiceState returns a k8s service state. The name can be either prefixed or not.
func (s *snap) GetServiceState(ctx context.Context, name string) (string, error) {
log.FromContext(ctx).V(2).WithCallDepth(1).Info("Getting service state", "service", name)

var b bytes.Buffer
err := s.runCommand(ctx, []string{"systemctl", "is-active", systemdServiceName(name)}, func(c *exec.Cmd) { c.Stdout = &b })
if err != nil {
// systemctl is-active has exit code 3 if the given service is not active.
unwrapped := errors.Unwrap(err)
if exitErr, ok := unwrapped.(*exec.ExitError); ok && exitErr.ExitCode() == 3 {
return "failed", nil
}
return "", err
}

return strings.TrimSpace(b.String()), nil
}

// Refresh refreshes the snap to a different track, revision or custom snap.
func (s *snap) Refresh(ctx context.Context, to types.RefreshOpts) (string, error) {
if s.Strict() {
Expand Down
29 changes: 29 additions & 0 deletions src/k8s/pkg/snap/util/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package snaputil
import (
"context"
"fmt"
"strings"

"github.com/canonical/k8s/pkg/snap"
)
Expand Down Expand Up @@ -98,6 +99,34 @@ func StopK8sDqliteServices(ctx context.Context, snap snap.Snap) error {
return nil
}

// CheckWorkerServicesStates checks if the worker services are in the expected state.
// CheckWorkerServicesStates will return on the first error or service state mismatch.
func CheckWorkerServicesStates(ctx context.Context, snap snap.Snap, state string) error {
return checkServicesStates(ctx, snap, workerServices, state)
}

// CheckControlPlaneServicesStates checks if the control plane services are in the expected state.
// CheckControlPlaneServicesStates will return on the first error or service state mismatch.
func CheckControlPlaneServicesStates(ctx context.Context, snap snap.Snap, state string) error {
return checkServicesStates(ctx, snap, controlPlaneServices, state)
}

// CheckK8sDqliteServices checks if the k8s-dqlite datastore service is in the expected state.
func CheckK8sDqliteServices(ctx context.Context, snap snap.Snap, state string) error {
return checkServicesStates(ctx, snap, []string{"k8s-dqlite"}, state)
}

func checkServicesStates(ctx context.Context, snap snap.Snap, services []string, state string) error {
for _, service := range services {
if actualState, err := snap.GetServiceState(ctx, service); err != nil {
return fmt.Errorf("failed to check service %s: %w", service, err)
} else if !strings.EqualFold(state, actualState) {
return fmt.Errorf("expected %s to be in state %s, but it is %s", service, state, actualState)
}
}
return nil
}

// ServiceArgsFromMap processes a map of string pointers and categorizes them into update and delete lists.
// - If the value pointer is nil, it adds the argument name to the delete list.
// - If the value pointer is not nil, it adds the argument and its value to the update map.
Expand Down
17 changes: 17 additions & 0 deletions src/k8s/pkg/utils/control/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@ func RetryFor(ctx context.Context, retryCount int, delayBetweenRetry time.Durati
}
return err
}

// Consistently will retry a given function for the given amount of times, expecting all calls to succeed without
// errors. It will wait for backoff between retries.
func Consistently(ctx context.Context, retryCount int, delayBetweenRetry time.Duration, retryFunc func() error) error {
for i := 0; i < retryCount; i++ {
if err := retryFunc(); err != nil {
return err
}
select {
case <-ctx.Done():
return context.Canceled
case <-time.After(delayBetweenRetry):
continue
}
}
return nil
}

0 comments on commit 58f3985

Please sign in to comment.