Skip to content

Commit

Permalink
Major refactoring & implement standalone mode (#12)
Browse files Browse the repository at this point in the history
* Major refactoring & start work on standalone mode (part 1)

Overhaul the code from a PoC to a semi-productive state.
Major changes done to the Processors, mainly the UpdateWatchConfig handler,
which provisions k8s managers.
Simplify large parts of the codebase.
Handle context cancellation propagation properly.
Ensure resource cleanup on graceful shutdown - do not leave hanging goroutines.

* Start gRPC stream with proper context

* Refactoring part 2

Fix comments
Exit Pod with status 1 in case the Manager can't be started (in K8s mode)
Remove caching of auth header in HTTP executor
Improvement of error messages

* Fix client cert creation in standalone mode

* Update env vars names

* Improve logging and fix some issues

* Handle session auto-config better

* Delete redundant cache struct

* Add some more logs to task processors

* Fix k8s managers and controllers

* Renaming

* Fix some TODOs

* Fix build

* Refactoring final: Reimplement auth header caching properly

* Small cleanup

* Bump vulnerable dependencies

* Fix basic auth header

* Fix comments
  • Loading branch information
radito3 authored Mar 25, 2024
1 parent 933d67c commit 433c35f
Show file tree
Hide file tree
Showing 71 changed files with 1,550 additions and 2,227 deletions.
47 changes: 47 additions & 0 deletions cmd/remote-work-processor/cmd_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"crypto/sha256"
"encoding/hex"
"flag"
"io"
"log"
"os"
)

type Options struct {
DisplayVersion bool
StandaloneMode bool
InstanceId string
MaxConnRetries uint
}

const (
standaloneModeOpt = "standalone-mode"
instanceIdOpt = "instance-id"
connRetriesOpt = "conn-retries"
versionOpt = "version"
)

func (opts *Options) BindFlags(fs *flag.FlagSet) {
hostname := getHashedHostname()

fs.BoolVar(&opts.StandaloneMode, standaloneModeOpt, false,
"Whether to run the Remote Work Processor in Standalone mode")
fs.StringVar(&opts.InstanceId, instanceIdOpt, hostname,
"Instance Identifier for the Remote Work Processor (only applicable for Standalone mode)")
fs.UintVar(&opts.MaxConnRetries, connRetriesOpt, 3, "Number of retries for gRPC connection to AutoPi server")
fs.BoolVar(&opts.DisplayVersion, versionOpt, false, "Display binary version and exit")
}

func getHashedHostname() string {
hostname, err := os.Hostname()
if err != nil {
log.Printf("could not get hostname: %v\n", err)
} else {
hasher := sha256.New()
io.WriteString(hasher, hostname)
hostname = hex.EncodeToString(hasher.Sum(nil))
}
return hostname
}
169 changes: 118 additions & 51 deletions cmd/remote-work-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,80 +17,147 @@ limitations under the License.
package main

import (
// "flag"
// "os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.

"context"
"flag"
"fmt"
"github.com/SAP/remote-work-processor/internal/grpc"
"github.com/SAP/remote-work-processor/internal/grpc/processors"
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
meta "github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"log"
"os"

"os/signal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"syscall"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

// "sigs.k8s.io/controller-runtime/pkg/healthz"
// "sigs.k8s.io/controller-runtime/pkg/log/zap"
// "github.com/SAP/remote-work-processor/kubernetes/controllers"
"github.com/SAP/remote-work-processor/internal/grpc"
"github.com/SAP/remote-work-processor/internal/grpc/processors"
"github.com/SAP/remote-work-processor/internal/kubernetes/controller"
"github.com/SAP/remote-work-processor/internal/kubernetes/metadata"
//+kubebuilder:scaffold:imports
)

var (
scheme = runtime.NewScheme()
// Version of the Remote Work Processor.
// Injected at linking time via ldflags.
Version string
// BuildDate of the Remote Work Processor.
// Injected at linking time via ldflags.
BuildDate string
)

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}

func main() {
metadata.InitRemoteWorkProcessorMetadata()
config := getKubeConfig()
opts := setupFlagsAndLogger()

e := controller.CreateManagerEngine(scheme, config)
processors.InitProcessorFactory(e)
grpc.InitRemoteWorkProcessorGrpcClient()
if opts.DisplayVersion {
fmt.Printf("rwp-%s Built: %s\n", Version, BuildDate)
return
}

opc := grpc.Client.Receive()
rootCtx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)

for {
op := <-opc
p, err := processors.Factory.CreateProcessor(op)
if err != nil {
log.Fatalf("Error occurred while creating operation processor: %v\n", err)
}
rwpMetadata := meta.LoadMetadata(opts.InstanceId, Version)
grpcClient := grpc.NewClient(rwpMetadata, opts.StandaloneMode)
var drainChan chan struct{}

res := <-p.Process()
if res.Err != nil {
log.Fatalf("Error occurred while processing operation: %v\n", err)
}
var factory processors.ProcessorFactory

if opts.StandaloneMode {
factory = processors.NewStandaloneProcessorFactory()
} else {
config := getKubeConfig()
scheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme

if res.Result != nil {
grpc.Client.Send(res.Result)
drainChan = make(chan struct{}, 1)
engine := controller.CreateManagerEngine(scheme, config, grpcClient)
factory = processors.NewKubernetesProcessorFactory(engine, drainChan)
}

connAttemptChan := make(chan struct{}, 1)
connAttemptChan <- struct{}{}
var connAttempts uint = 0

Loop:
for connAttempts < opts.MaxConnRetries {
select {
case <-rootCtx.Done():
log.Println("Received cancellation signal. Stopping Remote Work Processor...")
break Loop
case <-connAttemptChan:
err := grpcClient.InitSession(rootCtx, rwpMetadata.SessionID())
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
}
default:
operation, err := grpcClient.ReceiveMsg()
if err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
continue
}
if operation == nil {
// this flow is when the gRPC connection is closed (either by the server or the context has been cancelled)
connAttemptChan <- struct{}{}
// do not increment the retries, as this isn't a failure
continue
}

log.Printf("Creating processor for operation: %T\n", operation.Body)
processor, err := factory.CreateProcessor(operation)
if err != nil {
log.Printf("error creating operation processor: %v\n", err)
continue
}

msg, err := processor.Process(rootCtx)
if err != nil {
signalRetry(&connAttempts, connAttemptChan, fmt.Errorf("error processing operation: %v", err))
continue
}
if msg == nil {
continue
}

if err = grpcClient.Send(msg); err != nil {
signalRetry(&connAttempts, connAttemptChan, err)
}
}
}

if !opts.StandaloneMode {
// wait for context cancellation to be propagated to the k8s manager
<-drainChan
}
}

func getKubeConfig() *rest.Config {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}
func setupFlagsAndLogger() *Options {

Check failure on line 137 in cmd/remote-work-processor/main.go

View workflow job for this annotation

GitHub Actions / release

undefined: Options
opts := &Options{}

Check failure on line 138 in cmd/remote-work-processor/main.go

View workflow job for this annotation

GitHub Actions / release

undefined: Options
opts.BindFlags(flag.CommandLine)

kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
zapOpts := zap.Options{}
zapOpts.BindFlags(flag.CommandLine)

config, err := kubeConfig.ClientConfig()
flag.Parse()
ctrl.SetLogger(zap.New(zap.UseFlagOptions(&zapOpts)))
return opts
}

func getKubeConfig() *rest.Config {
config, err := rest.InClusterConfig()
if err != nil {
os.Exit(1)
log.Fatalln("Could not create kubeconfig:", err)
}

return config
}

func signalRetry(attempts *uint, retryChan chan<- struct{}, err error) {
if err != nil {
log.Println(err)
}
retryChan <- struct{}{}
*attempts++
}
22 changes: 13 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ go 1.20

require (
github.com/itchyny/gojq v0.12.12
github.com/pkg/errors v0.9.1
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
k8s.io/apimachinery v0.26.1
k8s.io/client-go v0.26.1
sigs.k8s.io/controller-runtime v0.14.6
Expand All @@ -20,6 +19,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/zapr v1.2.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
Expand All @@ -39,20 +39,24 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/oauth2 v0.10.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/term v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 433c35f

Please sign in to comment.