Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: working on migratting off logrus in controller #13916

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions cmd/workflow-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/argoproj/pkg/cli"
kubecli "github.com/argoproj/pkg/kube/cli"
"github.com/argoproj/pkg/stats"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand All @@ -31,6 +31,7 @@ import (
wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
cmdutil "github.com/argoproj/argo-workflows/v3/util/cmd"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/util/logging"
"github.com/argoproj/argo-workflows/v3/util/logs"
pprofutil "github.com/argoproj/argo-workflows/v3/util/pprof"
"github.com/argoproj/argo-workflows/v3/workflow/common"
Expand Down Expand Up @@ -72,6 +73,8 @@ func NewRootCommand() *cobra.Command {
RunE: func(c *cobra.Command, args []string) error {
defer runtimeutil.HandleCrashWithContext(context.Background(), runtimeutil.PanicHandlers...)

log := logging.NewSlogLogger()

cli.SetLogLevel(logLevel)
cmdutil.SetGLogLevel(glogLevel)
cmdutil.SetLogFormatter(logFormat)
Expand Down Expand Up @@ -104,7 +107,7 @@ func NewRootCommand() *cobra.Command {
wfclientset := wfclientset.NewForConfigOrDie(config)

if !namespaced && managedNamespace != "" {
log.Warn("ignoring --managed-namespace because --namespaced is false")
log.Warn(ctx, "ignoring --managed-namespace because --namespaced is false")
managedNamespace = ""
}
if namespaced && managedNamespace == "" {
Expand All @@ -118,15 +121,15 @@ func NewRootCommand() *cobra.Command {

leaderElectionOff := os.Getenv("LEADER_ELECTION_DISABLE")
if leaderElectionOff == "true" {
log.Info("Leader election is turned off. Running in single-instance mode")
log.WithField("id", "single-instance").Info("starting leading")
log.Info(ctx, "Leader election is turned off. Running in single-instance mode")
log.WithField(ctx, "id", "single-instance").Info(ctx, "starting leading")

go wfController.Run(ctx, workflowWorkers, workflowTTLWorkers, podCleanupWorkers, cronWorkflowWorkers, workflowArchiveWorkers)
go wfController.RunPrometheusServer(ctx, false)
} else {
nodeID, ok := os.LookupEnv("LEADER_ELECTION_IDENTITY")
if !ok {
log.Fatal("LEADER_ELECTION_IDENTITY must be set so that the workflow controllers can elect a leader")
log.Fatal(ctx, "LEADER_ELECTION_IDENTITY must be set so that the workflow controllers can elect a leader")
}

leaderName := "workflow-controller"
Expand Down Expand Up @@ -166,13 +169,13 @@ func NewRootCommand() *cobra.Command {
}()
},
OnStoppedLeading: func() {
log.WithField("id", nodeID).Info("stopped leading")
log.WithField(ctx, "id", nodeID).Info(ctx, "stopped leading")
cancel()
wg.Wait()
go wfController.RunPrometheusServer(dummyCtx, true)
},
OnNewLeader: func(identity string) {
log.WithField("leader", identity).Info("new leader")
log.WithField(ctx, "leader", identity).Info(ctx, "new leader")
},
},
})
Expand All @@ -181,7 +184,7 @@ func NewRootCommand() *cobra.Command {
http.HandleFunc("/healthz", wfController.Healthz)

go func() {
log.Println(http.ListenAndServe(":6060", nil))
log.Println(ctx, http.ListenAndServe(":6060", nil).Error())
}()

<-ctx.Done()
Expand Down Expand Up @@ -234,10 +237,10 @@ func init() {
}

func initConfig() {
log.SetFormatter(&log.TextFormatter{
/* log.SetFormatter(&log.TextFormatter{
TimestampFormat: "2006-01-02T15:04:05.000Z",
FullTimestamp: true,
})
}) */
}

func main() {
Expand Down
40 changes: 40 additions & 0 deletions util/logging/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package logging

import "context"

const (
ErrorField string = "error"
)

type Fields map[string]interface{}

// Logger exports a logging interface
type Logger interface {
WithFields(ctx context.Context, fields Fields) Logger
WithField(ctx context.Context, name string, value interface{}) Logger
WithError(ctx context.Context, err error) Logger

Info(ctx context.Context, msg string)
Infof(ctx context.Context, format string, args ...interface{})

Warn(ctx context.Context, msg string)
Warnf(ctx context.Context, format string, args ...interface{})

Fatal(ctx context.Context, msg string)
Fatalf(ctx context.Context, format string, args ...interface{})

Error(ctx context.Context, msg string)
Errorf(ctx context.Context, format string, args ...interface{})

Debug(ctx context.Context, msg string)
Debugf(ctx context.Context, format string, args ...interface{})

Warning(ctx context.Context, msg string)
Warningf(ctx context.Context, format string, args ...interface{})

Println(ctx context.Context, msg string)
Printf(ctx context.Context, format string, args ...interface{})

Panic(ctx context.Context, msg string)
Panicf(ctx context.Context, format string, args ...interface{})
}
141 changes: 141 additions & 0 deletions util/logging/slog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package logging

import (
"context"
"fmt"
"log/slog"
"os"
)

type slogLogger struct {
fields Fields
logger *slog.Logger
}

func (s *slogLogger) WithFields(_ context.Context, fields Fields) Logger {
logger := s.logger

newFields := make(Fields)
for k, v := range s.fields {
newFields[k] = v
logger = logger.With(k, v)
}
for k, v := range fields {
newFields[k] = v
logger = logger.With(k, v)
}

return &slogLogger{
fields: newFields,
logger: logger,
}
}

func (s *slogLogger) WithField(_ context.Context, name string, value interface{}) Logger {
newFields := make(Fields)

logger := s.logger
for k, v := range s.fields {
newFields[k] = v
logger = s.logger.With(k, v)
}

logger = logger.With(name, value)
newFields[name] = value
return &slogLogger{
fields: newFields,
logger: logger,
}
}

func (s *slogLogger) WithError(ctx context.Context, err error) Logger {
return s.WithField(ctx, ErrorField, err)
}

func (s *slogLogger) Info(ctx context.Context, msg string) {
s.logger.InfoContext(ctx, msg)
}

func (s *slogLogger) Infof(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.InfoContext(ctx, msg)
}

func (s *slogLogger) Warn(ctx context.Context, msg string) {
s.logger.WarnContext(ctx, msg)
}

func (s *slogLogger) Warnf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.WarnContext(ctx, msg)
}

func (s *slogLogger) Fatal(ctx context.Context, msg string) {
s.logger.ErrorContext(ctx, msg)
os.Exit(1)
}

func (s *slogLogger) Fatalf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.ErrorContext(ctx, msg)
os.Exit(1)
}

func (s *slogLogger) Error(ctx context.Context, msg string) {
s.logger.ErrorContext(ctx, msg)
}

func (s *slogLogger) Errorf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.ErrorContext(ctx, msg)
}

func (s *slogLogger) Debug(ctx context.Context, msg string) {
s.logger.DebugContext(ctx, msg)
}

func (s *slogLogger) Debugf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.DebugContext(ctx, msg)
}

func (s *slogLogger) Warning(ctx context.Context, msg string) {
s.logger.WarnContext(ctx, msg)
}

func (s *slogLogger) Warningf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.WarnContext(ctx, msg)
}

func (s *slogLogger) Println(ctx context.Context, msg string) {
s.logger.InfoContext(ctx, msg)
}

func (s *slogLogger) Printf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.InfoContext(ctx, msg)
}

func (s *slogLogger) Panic(ctx context.Context, msg string) {
s.logger.ErrorContext(ctx, msg)
panic(msg)
}

func (s *slogLogger) Panicf(ctx context.Context, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
s.logger.ErrorContext(ctx, msg)
panic(msg)
}

// NewSlogLogger returns a slog based logger
func NewSlogLogger() Logger {
f := make(Fields)
l := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
// l := slog.Default()
s := slogLogger{
fields: f,
logger: l,
}
return &s
}
8 changes: 5 additions & 3 deletions util/runtime/panic.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package runtime

import (
"context"
"runtime"

log "github.com/sirupsen/logrus"
"github.com/argoproj/argo-workflows/v3/util/logging"
)

func RecoverFromPanic(log *log.Entry) {
// RecoverFromPanic recovers from a panic and logs the panic and call stack
func RecoverFromPanic(ctx context.Context, log logging.Logger) {
if r := recover(); r != nil {
// Same as stdlib http server code. Manually allocate stack trace buffer size
// to prevent excessively large logs
Expand All @@ -15,7 +17,7 @@ func RecoverFromPanic(log *log.Entry) {
stackSize := runtime.Stack(stackTraceBuffer, false)
// Free up the unused spaces
stackTraceBuffer = stackTraceBuffer[:stackSize]
log.Errorf("recovered from panic %q. Call stack:\n%s",
log.Errorf(ctx, "recovered from panic %q. Call stack:\n%s",
r,
stackTraceBuffer)
}
Expand Down
23 changes: 12 additions & 11 deletions workflow/common/ancestry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package common

import (
"context"
"fmt"
"regexp"
"sort"
Expand All @@ -11,9 +12,9 @@ import (
)

type DagContext interface {
GetTask(taskName string) *wfv1.DAGTask
GetTaskDependencies(taskName string) []string
GetTaskFinishedAtTime(taskName string) time.Time
GetTask(ctx context.Context, taskName string) *wfv1.DAGTask
GetTaskDependencies(ctx context.Context, taskName string) []string
GetTaskFinishedAtTime(ctx context.Context, taskName string) time.Time
}

type TaskResult string
Expand Down Expand Up @@ -48,8 +49,8 @@ const (
DependencyTypeItems
)

func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) (map[string]DependencyType, string) {
depends := getTaskDependsLogic(task, ctx)
func GetTaskDependencies(ctx context.Context, task *wfv1.DAGTask, dctx DagContext) (map[string]DependencyType, string) {
depends := getTaskDependsLogic(ctx, task, dctx)
matches := taskNameRegex.FindAllStringSubmatchIndex(depends, -1)
var expansionMatches []expansionMatch
dependencies := make(map[string]DependencyType)
Expand Down Expand Up @@ -79,7 +80,7 @@ func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) (map[string]Depende
return expansionMatches[i].start > expansionMatches[j].start
})
for _, match := range expansionMatches {
matchTask := ctx.GetTask(match.taskName)
matchTask := dctx.GetTask(ctx, match.taskName)
depends = depends[:match.start] + expandDependency(match.taskName, matchTask) + depends[match.end:]
}

Expand All @@ -106,15 +107,15 @@ func ValidateTaskResults(dagTask *wfv1.DAGTask) error {
return nil
}

func getTaskDependsLogic(dagTask *wfv1.DAGTask, ctx DagContext) string {
func getTaskDependsLogic(ctx context.Context, dagTask *wfv1.DAGTask, dctx DagContext) string {
if dagTask.Depends != "" {
return dagTask.Depends
}

// For backwards compatibility, "dependencies: [A, B]" is equivalent to "depends: (A.Successful || A.Skipped || A.Daemoned)) && (B.Successful || B.Skipped || B.Daemoned)"
var dependencies []string
for _, dependency := range dagTask.Dependencies {
depTask := ctx.GetTask(dependency)
depTask := dctx.GetTask(ctx, dependency)
dependencies = append(dependencies, expandDependency(dependency, depTask))
}
return strings.Join(dependencies, " && ")
Expand All @@ -137,20 +138,20 @@ func expandDependency(depName string, depTask *wfv1.DAGTask) string {

// GetTaskAncestry returns a list of taskNames which are ancestors of this task.
// The list is ordered by the tasks finished time.
func GetTaskAncestry(ctx DagContext, taskName string) []string {
func GetTaskAncestry(ctx context.Context, dctx DagContext, taskName string) []string {
visited := make(map[string]time.Time)

var getAncestry func(currTask string)
getAncestry = func(currTask string) {
if _, seen := visited[currTask]; seen {
return
}
for _, depTask := range ctx.GetTaskDependencies(currTask) {
for _, depTask := range dctx.GetTaskDependencies(ctx, currTask) {
getAncestry(depTask)
}
if currTask != taskName {
if _, ok := visited[currTask]; !ok {
visited[currTask] = ctx.GetTaskFinishedAtTime(currTask)
visited[currTask] = dctx.GetTaskFinishedAtTime(ctx, currTask)
}
}
}
Expand Down
Loading
Loading