Skip to content

Commit

Permalink
feat: add workload-demand algorithm
Browse files Browse the repository at this point in the history
This is the skeleton of the workload-demand algorithm. Currently, we are
able to define jobs in a matrix, apply the ensemble.yaml, and then our
ensemble member (a minicluster) is created. The sidecar boots up
and starts the gRPC server, and the operator connects to it first
to get a status (the queue/jobs). At first hit there will be no
jobs or anything running. We then check the CRD status and see
there are jobs in the matrix. Since this algorithm is retroactive
(meaning it just submits a ton of work and then responds to it
with scaling and then termination) we submit all the jobs from
the matrix, and in the order they are supplied (or extended out).
Arguably this could be modified with an option for randomization,
by label, etc. The jobs are then submit on the MiniCluster, and
we check for subsequent queue status. This is up to where I have
implemented for this commit. Next I need to act on the MakeDecision
that the algorithm returns after receiving the queue, which will
typically be a request to scale up, down, or terminate. More
on this tomorrow.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Mar 24, 2024
1 parent c91ec66 commit d732cd5
Show file tree
Hide file tree
Showing 40 changed files with 890 additions and 308 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ RUN go mod download
# Copy the go source
COPY cmd/main.go cmd/main.go
COPY api/ api/
COPY pkg ./pkg
COPY algorithm ./algorithm
COPY internal/ ./internal
COPY controllers/ ./controllers
COPY ./protos protos

# Build
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ endif
python: python ## Generate python proto files in python
# pip install grpcio-tools
# pip freeze | grep grpcio-tools
# We will put rainbow plus the memory protos here
mkdir -p python/ensemble_operator/protos
cd python/ensemble_operator/protos
python -m grpc_tools.protoc -I./protos --python_out=./python/ensemble_operator/protos --pyi_out=./python/ensemble_operator/protos --grpc_python_out=./python/ensemble_operator/protos ./protos/ensemble-service.proto
Expand Down
34 changes: 28 additions & 6 deletions algorithm/workload/demand/demand.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package demand

import (
"encoding/json"
"fmt"

api "github.com/converged-computing/ensemble-operator/api/v1alpha1"
"github.com/converged-computing/ensemble-operator/internal/algorithm"
"github.com/converged-computing/ensemble-operator/pkg/algorithm"
"k8s.io/utils/set"
)

Expand Down Expand Up @@ -50,12 +51,34 @@ func (e WorkloadDemand) MakeDecision(
) (algorithm.AlgorithmDecision, error) {

fmt.Println(member.Jobs)

// This will determine if we need to patch the crd
decision := algorithm.AlgorithmDecision{}
//file, err := os.ReadFile(yamlFile)
//if err != nil {
// return &js, err
//}
updated := false

// For the algorithm here, we always submit all jobs that still have counts.
// We don't consider the queue status (payload) here because it does not matter
// This is a lookup
req := SubmitRequest{Jobs: []Job{}}
for _, job := range member.Jobs {
if job.Count > 0 {
req.AddJob(job.Name, job.Command, job.Count, job.Nodes)
job.Count = 0
updated = true
}
}

// Serialize the json into a payload
response, err := json.Marshal(&req)
if err != nil {
return decision, err
}
decision = algorithm.AlgorithmDecision{Updated: updated, Payload: string(response)}

// If we have updates, ask the queue to submit them
if updated {
decision.Action = algorithm.SubmitAction
}
return decision, nil
}

Expand All @@ -64,7 +87,6 @@ func (e WorkloadDemand) Validate(options algorithm.AlgorithmOptions) bool {
return true
}

// Add the selection algorithm to be known to rainbow
func init() {
a := WorkloadDemand{}
algorithm.Register(a)
Expand Down
18 changes: 18 additions & 0 deletions algorithm/workload/demand/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package demand

// A SubmitRequest includes jobs and counts to submit.
// It will be serialized into the json payload to the gRPC sidecar
type SubmitRequest struct {
Jobs []Job `json:"jobs,omitempty"`
}

type Job struct {
Name string `json:"name,omitempty"`
Command string `json:"command,omitempty"`
Count int32 `json:"count,omitempty"`
Nodes int32 `json:"nodes,omitempty"`
}

func (r *SubmitRequest) AddJob(name, command string, count, nodes int32) {
r.Jobs = append(r.Jobs, Job{Name: name, Command: command, Count: count, Nodes: nodes})
}
81 changes: 56 additions & 25 deletions api/v1alpha1/ensemble_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
)

var (
defaultSidecarbase = "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9"
defaultSidecarbase = "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9"
defaultAlgorithmName = "workload-demand"
)

// EnsembleSpec defines the desired state of Ensemble
Expand Down Expand Up @@ -57,33 +58,40 @@ type Member struct {
// +optional
MiniCluster minicluster.MiniCluster `json:"minicluster,omitempty"`

// Definition and customization of the sidecar
//+optional
Sidecar Sidecar `json:"sidecar,omitempty"`

// A member is required to define one or more jobs
// Jobs
Jobs []Job `json:"jobs"`

// Member specific algorithm to use
// If not defined, defaults to workload-demand
//+optional
Algorithm Algorithm `json:"algorithm"`
}

type Sidecar struct {

// Baseimage for the sidecar that will monitor the queue.
// Ensure that the operating systems match!
// +kubebuilder:default="ghcr.io/converged-computing/ensemble-operator-api:rockylinux9"
// +default="ghcr.io/converged-computing/ensemble-operator-api:rockylinux9"
// +optional
SidecarBase string `json:"sidecarBase"`
Image string `json:"image"`

// Always pull the sidecar container (useful for development)
// +optional
SidecarPullAlways bool `json:"sidecarPullAlways"`
PullAlways bool `json:"pullAlways"`

// +kubebuilder:default="50051"
// +default="50051"
SidecarPort string `json:"sidecarPort"`
Port string `json:"port"`

// +kubebuilder:default=10
// +default=10
SidecarWorkers int32 `json:"sidecarWorkers"`

// A member is required to define one or more jobs
// Jobs
Jobs []Job `json:"jobs"`

// Member specific algorithm to use
// If not defined, defaults to workload-demand
//+optional
Algorithm Algorithm `json:"algorithm"`
Workers int32 `json:"workers"`
}

type Algorithm struct {
Expand All @@ -106,9 +114,16 @@ type Job struct {
// Number of jobs to run
// This can be set to 0 depending on the algorithm
// E.g., some algorithms decide on the number to submit
// +kubebuilder:default=1
// +default=1
//+optional
Count int32 `json:"count"`

// +kubebuilder:default=1
// +default=1
//+optional
Nodes int32 `json:"nodes"`

// TODO add label here for ML model category

}
Expand All @@ -127,12 +142,23 @@ func (m *Member) Type() string {
return "unknown"
}

func (e *Ensemble) getDefaultAlgorithm() Algorithm {
defaultAlgorithm := e.Spec.Algorithm

// No we don't, it's empty
if reflect.DeepEqual(defaultAlgorithm, Algorithm{}) {
defaultAlgorithm = Algorithm{Name: defaultAlgorithmName}
}
return defaultAlgorithm
}

// Validate ensures we have data that is needed, and sets defaults if needed
func (e *Ensemble) Validate() error {
fmt.Println()

// These are the allowed sidecars
bases := set.New(
"ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test",
"ghcr.io/converged-computing/ensemble-operator-api:rockylinux9",
"ghcr.io/converged-computing/ensemble-operator-api:rockylinux8",
"ghcr.io/converged-computing/ensemble-operator-api:ubuntu-focal",
Expand All @@ -143,12 +169,7 @@ func (e *Ensemble) Validate() error {
fmt.Printf("🤓 Ensemble.members %d\n", len(e.Spec.Members))

// Do we have a default algorithm set?
defaultAlgorithm := e.Spec.Algorithm

// No we don't, it's empty
if reflect.DeepEqual(defaultAlgorithm, Algorithm{}) {
defaultAlgorithm = Algorithm{Name: "workload-demand"}
}
defaultAlgorithm := e.getDefaultAlgorithm()

// If MaxSize is set, it must be greater than size
if len(e.Spec.Members) < 1 {
Expand All @@ -170,14 +191,24 @@ func (e *Ensemble) Validate() error {
if len(member.Jobs) == 0 {
return fmt.Errorf("ensemble member in index %d must have at least one job definition", i)
}

// Validate jobs matrix
for _, job := range member.Jobs {
if job.Count <= 0 {
job.Count = 1
}
}

// If we have a minicluster, all three sizes must be defined
if !reflect.DeepEqual(member.MiniCluster, minicluster.MiniCluster{}) {
fmt.Println(" Ensemble.member Type: minicluster")

if member.SidecarBase == "" {
member.SidecarBase = defaultSidecarbase
fmt.Println(" Ensemble.member Type: minicluster")
if member.Sidecar.Image == "" {
member.Sidecar.Image = defaultSidecarbase
}
fmt.Printf(" Ensemble.member.SidecarBase: %s\n", member.SidecarBase)
fmt.Printf(" Ensemble.member.Sidecar.Image: %s\n", member.Sidecar.Image)
fmt.Printf(" Ensemble.member.Sidecar.Port: %s\n", member.Sidecar.Port)
fmt.Printf(" Ensemble.member.Sidecar.PullAlways: %v\n", member.Sidecar.PullAlways)

if member.MiniCluster.Spec.MaxSize <= 0 || member.MiniCluster.Spec.Size <= 0 {
return fmt.Errorf("ensemble minicluster must have a size and maxsize of at least 1")
Expand All @@ -191,7 +222,7 @@ func (e *Ensemble) Validate() error {
}

// Base container must be in valid set
if !bases.Has(member.SidecarBase) {
if !bases.Has(member.Sidecar.Image) {
return fmt.Errorf("base image must be rocky linux or ubuntu: %s", bases)
}
count += 1
Expand Down
16 changes: 16 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 28 additions & 17 deletions chart/templates/ensemble-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ spec:
description: Command given to flux
type: string
count:
default: 1
description: Number of jobs to run This can be set to 0
depending on the algorithm E.g., some algorithms decide
on the number to submit
Expand All @@ -82,6 +83,10 @@ spec:
name:
description: Name to identify the job group
type: string
nodes:
default: 1
format: int32
type: integer
required:
- command
- name
Expand Down Expand Up @@ -956,25 +961,31 @@ spec:
- size
type: object
type: object
sidecarBase:
default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9
description: Baseimage for the sidecar that will monitor the queue.
Ensure that the operating systems match!
type: string
sidecarPort:
default: "50051"
type: string
sidecarPullAlways:
description: Always pull the sidecar container (useful for development)
type: boolean
sidecarWorkers:
default: 10
format: int32
type: integer
sidecar:
description: Definition and customization of the sidecar
properties:
image:
default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9
description: Baseimage for the sidecar that will monitor the
queue. Ensure that the operating systems match!
type: string
port:
default: "50051"
type: string
pullAlways:
description: Always pull the sidecar container (useful for
development)
type: boolean
workers:
default: 10
format: int32
type: integer
required:
- port
- workers
type: object
required:
- jobs
- sidecarPort
- sidecarWorkers
type: object
type: array
required:
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"k8s.io/client-go/rest"

api "github.com/converged-computing/ensemble-operator/api/v1alpha1"
"github.com/converged-computing/ensemble-operator/internal/controller"
controller "github.com/converged-computing/ensemble-operator/controllers/ensemble"
minicluster "github.com/flux-framework/flux-operator/api/v1alpha2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down
Loading

0 comments on commit d732cd5

Please sign in to comment.