From d732cd5c187fdb24002791cf5f319336fabeec0b Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 24 Mar 2024 01:19:33 -0600 Subject: [PATCH] feat: add workload-demand algorithm This is the skeleton of the workload-demand algorithm. Currently, we are able to define jobs in a matrix, apply the ensemble.yaml, and then our ensemble member (a minicluster) is created. The sidecar boots up and starts the gRPC server, and the operator connects to it first to get a status (the queue/jobs). At first hit there will be no jobs or anything running. We then check the CRD status and see there are jobs in the matrix. Since this algorithm is retroactive (meaning it just submits a ton of work and then responds to it with scaling and then termination) we submit all the jobs from the matrix, and in the order they are supplied (or extended out). Arguably this could be modified with an option for randomization, by label, etc. The jobs are then submit on the MiniCluster, and we check for subsequent queue status. This is up to where I have implemented for this commit. Next I need to act on the MakeDecision that the algorithm returns after receiving the queue, which will typically be a request to scale up, down, or terminate. More on this tomorrow. Signed-off-by: vsoch --- Dockerfile | 3 +- Makefile | 1 - algorithm/workload/demand/demand.go | 34 ++- algorithm/workload/demand/types.go | 18 ++ api/v1alpha1/ensemble_types.go | 81 ++++-- api/v1alpha1/zz_generated.deepcopy.go | 16 ++ chart/templates/ensemble-crd.yaml | 45 ++-- cmd/main.go | 2 +- ...ensemble.flux-framework.org_ensembles.yaml | 45 ++-- .../ensemble}/ensemble_controller.go | 2 +- .../ensemble}/ensemble_controller_test.go | 0 .../ensemble}/minicluster.go | 6 +- .../ensemble}/suite_test.go | 0 .../ensemble}/templates.go | 0 .../ensemble}/update.go | 69 +++-- examples/algorithms/workload/demand/README.md | 41 ++- .../algorithms/workload/demand/ensemble.yaml | 4 +- examples/dist/ensemble-operator-arm.yaml | 45 ++-- examples/dist/ensemble-operator-dev.yaml | 45 ++-- examples/dist/ensemble-operator.yaml | 45 ++-- examples/tests/lammps/ensemble.yaml | 18 +- {internal => pkg}/algorithm/algorithm.go | 11 +- {internal => pkg}/client/client.go | 38 ++- {internal => pkg}/types/types.go | 0 protos/ensemble-service.pb.go | 249 ++++++++++++------ protos/ensemble-service.proto | 19 +- protos/ensemble-service_grpc.pb.go | 46 +++- .../ensemble_operator/algorithm/__init__.py | 12 + python/ensemble_operator/algorithm/base.py | 21 ++ .../algorithm/workload_demand.py | 16 ++ python/ensemble_operator/members/__init__.py | 12 + python/ensemble_operator/members/base.py | 18 ++ .../members/minicluster/__init__.py | 52 ++++ .../{ => members/minicluster}/metrics.py | 0 .../protos/ensemble_service_pb2.py | 18 +- .../protos/ensemble_service_pb2.pyi | 48 ++-- .../protos/ensemble_service_pb2_grpc.py | 51 +++- python/ensemble_operator/server.py | 59 +++-- python/script/build-docker.sh | 6 + python/setup.cfg | 2 +- 40 files changed, 890 insertions(+), 308 deletions(-) create mode 100644 algorithm/workload/demand/types.go rename {internal/controller => controllers/ensemble}/ensemble_controller.go (99%) rename {internal/controller => controllers/ensemble}/ensemble_controller_test.go (100%) rename {internal/controller => controllers/ensemble}/minicluster.go (95%) rename {internal/controller => controllers/ensemble}/suite_test.go (100%) rename {internal/controller => controllers/ensemble}/templates.go (100%) rename {internal/controller => controllers/ensemble}/update.go (63%) rename {internal => pkg}/algorithm/algorithm.go (89%) rename {internal => pkg}/client/client.go (68%) rename {internal => pkg}/types/types.go (100%) create mode 100644 python/ensemble_operator/algorithm/__init__.py create mode 100644 python/ensemble_operator/algorithm/base.py create mode 100644 python/ensemble_operator/algorithm/workload_demand.py create mode 100644 python/ensemble_operator/members/__init__.py create mode 100644 python/ensemble_operator/members/base.py create mode 100644 python/ensemble_operator/members/minicluster/__init__.py rename python/ensemble_operator/{ => members/minicluster}/metrics.py (100%) create mode 100755 python/script/build-docker.sh diff --git a/Dockerfile b/Dockerfile index 03f79c0..aed5abc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,9 @@ RUN go mod download # Copy the go source COPY cmd/main.go cmd/main.go COPY api/ api/ +COPY pkg ./pkg COPY algorithm ./algorithm -COPY internal/ ./internal +COPY controllers/ ./controllers COPY ./protos protos # Build diff --git a/Makefile b/Makefile index 8a75638..63eb1b9 100644 --- a/Makefile +++ b/Makefile @@ -72,7 +72,6 @@ endif python: python ## Generate python proto files in python # pip install grpcio-tools # pip freeze | grep grpcio-tools - # We will put rainbow plus the memory protos here mkdir -p python/ensemble_operator/protos cd python/ensemble_operator/protos python -m grpc_tools.protoc -I./protos --python_out=./python/ensemble_operator/protos --pyi_out=./python/ensemble_operator/protos --grpc_python_out=./python/ensemble_operator/protos ./protos/ensemble-service.proto diff --git a/algorithm/workload/demand/demand.go b/algorithm/workload/demand/demand.go index 3877713..7c4971c 100644 --- a/algorithm/workload/demand/demand.go +++ b/algorithm/workload/demand/demand.go @@ -1,10 +1,11 @@ package demand import ( + "encoding/json" "fmt" api "github.com/converged-computing/ensemble-operator/api/v1alpha1" - "github.com/converged-computing/ensemble-operator/internal/algorithm" + "github.com/converged-computing/ensemble-operator/pkg/algorithm" "k8s.io/utils/set" ) @@ -50,12 +51,34 @@ func (e WorkloadDemand) MakeDecision( ) (algorithm.AlgorithmDecision, error) { fmt.Println(member.Jobs) + + // This will determine if we need to patch the crd decision := algorithm.AlgorithmDecision{} - //file, err := os.ReadFile(yamlFile) - //if err != nil { - // return &js, err - //} + updated := false + + // For the algorithm here, we always submit all jobs that still have counts. + // We don't consider the queue status (payload) here because it does not matter + // This is a lookup + req := SubmitRequest{Jobs: []Job{}} + for _, job := range member.Jobs { + if job.Count > 0 { + req.AddJob(job.Name, job.Command, job.Count, job.Nodes) + job.Count = 0 + updated = true + } + } + // Serialize the json into a payload + response, err := json.Marshal(&req) + if err != nil { + return decision, err + } + decision = algorithm.AlgorithmDecision{Updated: updated, Payload: string(response)} + + // If we have updates, ask the queue to submit them + if updated { + decision.Action = algorithm.SubmitAction + } return decision, nil } @@ -64,7 +87,6 @@ func (e WorkloadDemand) Validate(options algorithm.AlgorithmOptions) bool { return true } -// Add the selection algorithm to be known to rainbow func init() { a := WorkloadDemand{} algorithm.Register(a) diff --git a/algorithm/workload/demand/types.go b/algorithm/workload/demand/types.go new file mode 100644 index 0000000..71e1f2e --- /dev/null +++ b/algorithm/workload/demand/types.go @@ -0,0 +1,18 @@ +package demand + +// A SubmitRequest includes jobs and counts to submit. +// It will be serialized into the json payload to the gRPC sidecar +type SubmitRequest struct { + Jobs []Job `json:"jobs,omitempty"` +} + +type Job struct { + Name string `json:"name,omitempty"` + Command string `json:"command,omitempty"` + Count int32 `json:"count,omitempty"` + Nodes int32 `json:"nodes,omitempty"` +} + +func (r *SubmitRequest) AddJob(name, command string, count, nodes int32) { + r.Jobs = append(r.Jobs, Job{Name: name, Command: command, Count: count, Nodes: nodes}) +} diff --git a/api/v1alpha1/ensemble_types.go b/api/v1alpha1/ensemble_types.go index 3ab94e7..54a6ae3 100644 --- a/api/v1alpha1/ensemble_types.go +++ b/api/v1alpha1/ensemble_types.go @@ -26,7 +26,8 @@ import ( ) var ( - defaultSidecarbase = "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" + defaultSidecarbase = "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" + defaultAlgorithmName = "workload-demand" ) // EnsembleSpec defines the desired state of Ensemble @@ -57,33 +58,40 @@ type Member struct { // +optional MiniCluster minicluster.MiniCluster `json:"minicluster,omitempty"` + // Definition and customization of the sidecar + //+optional + Sidecar Sidecar `json:"sidecar,omitempty"` + + // A member is required to define one or more jobs + // Jobs + Jobs []Job `json:"jobs"` + + // Member specific algorithm to use + // If not defined, defaults to workload-demand + //+optional + Algorithm Algorithm `json:"algorithm"` +} + +type Sidecar struct { + // Baseimage for the sidecar that will monitor the queue. // Ensure that the operating systems match! // +kubebuilder:default="ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" // +default="ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" // +optional - SidecarBase string `json:"sidecarBase"` + Image string `json:"image"` // Always pull the sidecar container (useful for development) // +optional - SidecarPullAlways bool `json:"sidecarPullAlways"` + PullAlways bool `json:"pullAlways"` // +kubebuilder:default="50051" // +default="50051" - SidecarPort string `json:"sidecarPort"` + Port string `json:"port"` // +kubebuilder:default=10 // +default=10 - SidecarWorkers int32 `json:"sidecarWorkers"` - - // A member is required to define one or more jobs - // Jobs - Jobs []Job `json:"jobs"` - - // Member specific algorithm to use - // If not defined, defaults to workload-demand - //+optional - Algorithm Algorithm `json:"algorithm"` + Workers int32 `json:"workers"` } type Algorithm struct { @@ -106,9 +114,16 @@ type Job struct { // Number of jobs to run // This can be set to 0 depending on the algorithm // E.g., some algorithms decide on the number to submit + // +kubebuilder:default=1 + // +default=1 //+optional Count int32 `json:"count"` + // +kubebuilder:default=1 + // +default=1 + //+optional + Nodes int32 `json:"nodes"` + // TODO add label here for ML model category } @@ -127,12 +142,23 @@ func (m *Member) Type() string { return "unknown" } +func (e *Ensemble) getDefaultAlgorithm() Algorithm { + defaultAlgorithm := e.Spec.Algorithm + + // No we don't, it's empty + if reflect.DeepEqual(defaultAlgorithm, Algorithm{}) { + defaultAlgorithm = Algorithm{Name: defaultAlgorithmName} + } + return defaultAlgorithm +} + // Validate ensures we have data that is needed, and sets defaults if needed func (e *Ensemble) Validate() error { fmt.Println() // These are the allowed sidecars bases := set.New( + "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test", "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9", "ghcr.io/converged-computing/ensemble-operator-api:rockylinux8", "ghcr.io/converged-computing/ensemble-operator-api:ubuntu-focal", @@ -143,12 +169,7 @@ func (e *Ensemble) Validate() error { fmt.Printf("🤓 Ensemble.members %d\n", len(e.Spec.Members)) // Do we have a default algorithm set? - defaultAlgorithm := e.Spec.Algorithm - - // No we don't, it's empty - if reflect.DeepEqual(defaultAlgorithm, Algorithm{}) { - defaultAlgorithm = Algorithm{Name: "workload-demand"} - } + defaultAlgorithm := e.getDefaultAlgorithm() // If MaxSize is set, it must be greater than size if len(e.Spec.Members) < 1 { @@ -170,14 +191,24 @@ func (e *Ensemble) Validate() error { if len(member.Jobs) == 0 { return fmt.Errorf("ensemble member in index %d must have at least one job definition", i) } + + // Validate jobs matrix + for _, job := range member.Jobs { + if job.Count <= 0 { + job.Count = 1 + } + } + // If we have a minicluster, all three sizes must be defined if !reflect.DeepEqual(member.MiniCluster, minicluster.MiniCluster{}) { - fmt.Println(" Ensemble.member Type: minicluster") - if member.SidecarBase == "" { - member.SidecarBase = defaultSidecarbase + fmt.Println(" Ensemble.member Type: minicluster") + if member.Sidecar.Image == "" { + member.Sidecar.Image = defaultSidecarbase } - fmt.Printf(" Ensemble.member.SidecarBase: %s\n", member.SidecarBase) + fmt.Printf(" Ensemble.member.Sidecar.Image: %s\n", member.Sidecar.Image) + fmt.Printf(" Ensemble.member.Sidecar.Port: %s\n", member.Sidecar.Port) + fmt.Printf(" Ensemble.member.Sidecar.PullAlways: %v\n", member.Sidecar.PullAlways) if member.MiniCluster.Spec.MaxSize <= 0 || member.MiniCluster.Spec.Size <= 0 { return fmt.Errorf("ensemble minicluster must have a size and maxsize of at least 1") @@ -191,7 +222,7 @@ func (e *Ensemble) Validate() error { } // Base container must be in valid set - if !bases.Has(member.SidecarBase) { + if !bases.Has(member.Sidecar.Image) { return fmt.Errorf("base image must be rocky linux or ubuntu: %s", bases) } count += 1 diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 371d7a1..8f1ec3c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -155,6 +155,7 @@ func (in *Job) DeepCopy() *Job { func (in *Member) DeepCopyInto(out *Member) { *out = *in in.MiniCluster.DeepCopyInto(&out.MiniCluster) + out.Sidecar = in.Sidecar if in.Jobs != nil { in, out := &in.Jobs, &out.Jobs *out = make([]Job, len(*in)) @@ -172,3 +173,18 @@ func (in *Member) DeepCopy() *Member { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Sidecar) DeepCopyInto(out *Sidecar) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Sidecar. +func (in *Sidecar) DeepCopy() *Sidecar { + if in == nil { + return nil + } + out := new(Sidecar) + in.DeepCopyInto(out) + return out +} diff --git a/chart/templates/ensemble-crd.yaml b/chart/templates/ensemble-crd.yaml index 6c36227..9d4af2b 100644 --- a/chart/templates/ensemble-crd.yaml +++ b/chart/templates/ensemble-crd.yaml @@ -74,6 +74,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -82,6 +83,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -956,25 +961,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the queue. - Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor the + queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/cmd/main.go b/cmd/main.go index e5c5f21..2d5e599 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/rest" api "github.com/converged-computing/ensemble-operator/api/v1alpha1" - "github.com/converged-computing/ensemble-operator/internal/controller" + controller "github.com/converged-computing/ensemble-operator/controllers/ensemble" minicluster "github.com/flux-framework/flux-operator/api/v1alpha2" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml b/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml index 6c5ce3d..b9ece3f 100644 --- a/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml +++ b/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml @@ -75,6 +75,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -83,6 +84,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -967,25 +972,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/internal/controller/ensemble_controller.go b/controllers/ensemble/ensemble_controller.go similarity index 99% rename from internal/controller/ensemble_controller.go rename to controllers/ensemble/ensemble_controller.go index fe21f8d..4d768e8 100644 --- a/internal/controller/ensemble_controller.go +++ b/controllers/ensemble/ensemble_controller.go @@ -116,7 +116,7 @@ func (r *EnsembleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if !reflect.DeepEqual(member.MiniCluster, minicluster.MiniClusterSpec{}) { name := fmt.Sprintf("%s-%d", ensemble.Name, i) fmt.Printf("Checking member %s\n", name) - result, err := r.updateMiniClusterEnsemble(ctx, name, &ensemble, &member) + result, err := r.updateMiniClusterEnsemble(ctx, name, &ensemble, &member, i) // An error could indicates a requeue (without the break) since something was off // We likely need to tweak this a bit to account for potential updates to specific // ensemble members, but this is fine for a first shot. diff --git a/internal/controller/ensemble_controller_test.go b/controllers/ensemble/ensemble_controller_test.go similarity index 100% rename from internal/controller/ensemble_controller_test.go rename to controllers/ensemble/ensemble_controller_test.go diff --git a/internal/controller/minicluster.go b/controllers/ensemble/minicluster.go similarity index 95% rename from internal/controller/minicluster.go rename to controllers/ensemble/minicluster.go index eaa51b0..3f71149 100644 --- a/internal/controller/minicluster.go +++ b/controllers/ensemble/minicluster.go @@ -101,13 +101,13 @@ func (r *EnsembleReconciler) newMiniCluster( spec.Spec.Interactive = true // Start command for ensemble grpc service - command := fmt.Sprintf(postCommand, member.SidecarPort, member.SidecarWorkers) + command := fmt.Sprintf(postCommand, member.Sidecar.Port, member.Sidecar.Workers) // Create a new container for the flux metrics API to run, this will communicate with our grpc sidecar := minicluster.MiniClusterContainer{ Name: "api", - Image: member.SidecarBase, - PullAlways: member.SidecarPullAlways, + Image: member.Sidecar.Image, + PullAlways: member.Sidecar.PullAlways, Commands: minicluster.Commands{ Post: command, }, diff --git a/internal/controller/suite_test.go b/controllers/ensemble/suite_test.go similarity index 100% rename from internal/controller/suite_test.go rename to controllers/ensemble/suite_test.go diff --git a/internal/controller/templates.go b/controllers/ensemble/templates.go similarity index 100% rename from internal/controller/templates.go rename to controllers/ensemble/templates.go diff --git a/internal/controller/update.go b/controllers/ensemble/update.go similarity index 63% rename from internal/controller/update.go rename to controllers/ensemble/update.go index 3dee361..512c715 100644 --- a/internal/controller/update.go +++ b/controllers/ensemble/update.go @@ -5,13 +5,14 @@ import ( "fmt" api "github.com/converged-computing/ensemble-operator/api/v1alpha1" - "github.com/converged-computing/ensemble-operator/internal/algorithm" - "github.com/converged-computing/ensemble-operator/internal/client" + "github.com/converged-computing/ensemble-operator/pkg/algorithm" + "github.com/converged-computing/ensemble-operator/pkg/client" pb "github.com/converged-computing/ensemble-operator/protos" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" + kclient "sigs.k8s.io/controller-runtime/pkg/client" ) // ensureEnsemble ensures that the ensemle is created! @@ -20,6 +21,7 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble( name string, ensemble *api.Ensemble, member *api.Member, + idx int, ) (ctrl.Result, error) { // The MiniCluster service is being provided by the index 0 pod, so we can find it here. @@ -32,7 +34,7 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble( // A selector just for the lead broker pod of the ensemble MiniCluster labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{ // job-name corresponds to the ensemble name plus index in the list - "job-name": fmt.Sprintf("%s-0", ensemble.Name), + "job-name": name, // job index is the lead broker (0) within "job-index": "0", }} @@ -66,46 +68,75 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble( } // Create a client to the pod (host) - host := fmt.Sprintf("%s:%s", ipAddress, member.SidecarPort) + host := fmt.Sprintf("%s:%s", ipAddress, member.Sidecar.Port) r.Log.Info("🦀 MiniCluster Ensemble Update", "Host", host) c, err := client.NewClient(host) if err != nil { return ctrl.Result{Requeue: true}, err } - // Get the queue status! - in := pb.StatusRequest{} - response, err := c.RequestStatus(ctx, &in) - if err != nil { - r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with status request", err) - return ctrl.Result{Requeue: true}, err - } // Get the algorithm - if this fails we stop - a, err := algorithm.Get(member.Algorithm.Name) + algo, err := algorithm.Get(member.Algorithm.Name) if err != nil { r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Failed to retrieve algorithm", err) return ctrl.Result{Requeue: true}, err } // check that the algorithm is valid for the member type - // TODO add actual options here - err = a.Check(algorithm.AlgorithmOptions{}, member) + err = algo.Check(algorithm.AlgorithmOptions{}, member) if err != nil { - r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Algorithm %s does not support %s", a.Name(), member.Type()) + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Algorithm %s does not support %s", algo.Name(), member.Type()) return ctrl.Result{}, err } + // The status request comes first to peek at the queue + // TODO add secret here, maybe don't need Name + in := pb.StatusRequest{Member: member.Type()} + response, err := c.RequestStatus(ctx, &in) + if err != nil { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with status request", err) + return ctrl.Result{Requeue: true}, err + } fmt.Println(response.Status) - fmt.Println(response.Payload) - // Make a decision - decision, err := a.MakeDecision(response.Payload, member) + // Make a decision based on the queue + decision, err := algo.MakeDecision(response.Payload, member) if err != nil { r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Decision Error", err) return ctrl.Result{}, err } - fmt.Println(decision) + // If we are requesting an action to the queue (sidecar gRPC) do it + // This second request should be OK because I think it will be infrequent. + // Most algorithms should do submission in bulk (infrequently) and then monitor + if decision.Action != "" { + in := pb.ActionRequest{ + Member: member.Type(), + Algorithm: algo.Name(), + Payload: decision.Payload, + Action: decision.Action, + } + response, err := c.RequestAction(ctx, &in) + if err != nil { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with status request", err) + return ctrl.Result{Requeue: true}, err + } + fmt.Println(response.Status) + } + + // Do we need to update / patch the CRD? For example, if we adjust Jobs, we need to + // Since we requeue anyway, don't need special logic to do it here + if decision.Updated { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client Update!") + patch := kclient.MergeFrom(ensemble.DeepCopy()) + ensemble.Spec.Members[idx] = *member + err := r.Patch(ctx, ensemble, patch) + if err != nil { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with patch", err) + return ctrl.Result{}, nil + } + r.Update(ctx, ensemble) + } // This is the last return, this says to check every N seconds return ctrl.Result{Requeue: true}, nil } diff --git a/examples/algorithms/workload/demand/README.md b/examples/algorithms/workload/demand/README.md index 56d0b56..79012d7 100644 --- a/examples/algorithms/workload/demand/README.md +++ b/examples/algorithms/workload/demand/README.md @@ -1,7 +1,13 @@ # Workload Demand You can read about the workload demand algorithm [here](https://github.com/converged-computing/ensemble-operator/blob/main/docs/algorithms.md#workoad-demand-of-consistent-sizes). -For this example, we assume you have a cluster running (e.g., with kind) and have installed the Flux Operator and Ensemble Operator. +For this example, we assume you have a cluster running (e.g., with kind) and have installed the Flux Operator and Ensemble Operator. Here is what is going to happen: + +1. We define our jobs matrix in the [ensemble.yaml](ensemble.yaml). The jobs matrix is consistent across algorithms. +2. For this algorithm, we will first get a cluster status `StatusRequest`. We don't use it here aside from establishing communication. +3. We then detect that the job matrix has outstanding jobs, and make an `ActionRequest` to "submit" +4. The jobs are submit on the MiniCluster +5. We proceed to monitor, scaling when conditions are met, downsizing when jobs are finishing, and terminating after that. ## Usage @@ -14,12 +20,22 @@ kubectl apply -f ensemble.yaml ``` We can check both the gRPC sidecar and the operator to see if information is being received. Here is the -sidecar: +sidecar (after setup steps): ```bash kubectl logs workload-demand-0-0-vfxxd -c api -f ``` ```console +🥞️ Starting ensemble endpoint at :50051 + +Member type: minicluster +Algorithm workload-demand +Action submit +Payload {"jobs":[{"name":"lammps-2","command":"lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite","count":10,"nodes":1},{"name":"lammps-4","command":"lmp -v x 4 -v y 4 -v z 4 -in in.reaxc.hns -nocite","count":5,"nodes":1}]} + +Member type: minicluster +Algorithm workload-demand +Action submit ``` And here is the operator: @@ -27,12 +43,23 @@ And here is the operator: ```bash kubectl logs -n ensemble-operator-system ensemble-operator-controller-manager-5f874bb7d8-m68jb ``` +```console + => Ensemble.member 0 + Ensemble.member.Algorithm: workload-demand + Ensemble.member Type: minicluster + Ensemble.member.Sidecar.Image: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test + Ensemble.member.Sidecar.Port: 50051 + Ensemble.member.Sidecar.PullAlways: true +Checking member workload-demand-0 +2024-03-24T07:16:09Z INFO Pod {"controller": "ensemble", "controllerGroup": "ensemble.flux-framework.org", "controllerKind": "Ensemble", "Ensemble": {"name":"workload-demand","namespace":"default"}, "namespace": "default", "name": "workload-demand", "reconcileID": "7be3ac2c-c224-4089-b362-27a36fd5298e", "IP Address": "10.244.1.43"} +2024-03-24T07:16:09Z INFO 🦀 MiniCluster Ensemble Update {"controller": "ensemble", "controllerGroup": "ensemble.flux-framework.org", "controllerKind": "Ensemble", "Ensemble": {"name":"workload-demand","namespace":"default"}, "namespace": "default", "name": "workload-demand", "reconcileID": "7be3ac2c-c224-4089-b362-27a36fd5298e", "Host": "10.244.1.43:50051"} +2024/03/24 07:16:09 🥞️ starting client (10.244.1.43:50051)... +``` +WIP - the above is not done yet! ## TODO -- the base containers should have the grpc already built so we don't have to wait -- get a list of commands from the workload-demand decision endpoint, and send them back to the sidecar - - this means we need to have a string payload that can be received - - this also means the algorithms receiving endpoints need to be defined within the python service -- add to the decision logic to indicate a flag for when a spec (CRD) is updated so we do a patch \ No newline at end of file +- Now that we are submitting jobs on start, we need to react to the other decision, scaling the cluster up. +- We also need to then set the terminate action conditions, define the action, and successfully terminate the ensemble member (minicluster) +- after this is done, this first algorithm should be mostly done. \ No newline at end of file diff --git a/examples/algorithms/workload/demand/ensemble.yaml b/examples/algorithms/workload/demand/ensemble.yaml index 15415a6..6a10a08 100644 --- a/examples/algorithms/workload/demand/ensemble.yaml +++ b/examples/algorithms/workload/demand/ensemble.yaml @@ -4,7 +4,9 @@ metadata: name: workload-demand spec: members: - - sidecarPullAlways: true + - sidecar: + pullAlways: true + image: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test algorithm: name: workload-demand jobs: diff --git a/examples/dist/ensemble-operator-arm.yaml b/examples/dist/ensemble-operator-arm.yaml index 064cd69..a51bcd2 100644 --- a/examples/dist/ensemble-operator-arm.yaml +++ b/examples/dist/ensemble-operator-arm.yaml @@ -87,6 +87,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -95,6 +96,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -979,25 +984,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/examples/dist/ensemble-operator-dev.yaml b/examples/dist/ensemble-operator-dev.yaml index f20e07b..c28e44a 100644 --- a/examples/dist/ensemble-operator-dev.yaml +++ b/examples/dist/ensemble-operator-dev.yaml @@ -87,6 +87,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -95,6 +96,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -979,25 +984,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/examples/dist/ensemble-operator.yaml b/examples/dist/ensemble-operator.yaml index 7b7fc9c..060b52f 100644 --- a/examples/dist/ensemble-operator.yaml +++ b/examples/dist/ensemble-operator.yaml @@ -87,6 +87,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -95,6 +96,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -979,25 +984,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/examples/tests/lammps/ensemble.yaml b/examples/tests/lammps/ensemble.yaml index 3ac6bd7..70659de 100644 --- a/examples/tests/lammps/ensemble.yaml +++ b/examples/tests/lammps/ensemble.yaml @@ -4,10 +4,15 @@ metadata: name: ensemble-sample spec: members: - - sidecarPullAlways: true + - sidecar: + pullAlways: true + + jobs: + - name: lammps + command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite + minicluster: spec: - size: 2 maxSize: 4 minSize: 2 @@ -18,11 +23,4 @@ spec: - image: ghcr.io/converged-computing/metric-lammps:latest # You can set the working directory if your container WORKDIR is not correct. - workingDir: /opt/lammps/examples/reaxff/HNS - batchRaw: true - batch: true - commands: - post: sleep infinity - command: | - lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite - sleep infinity \ No newline at end of file + workingDir: /opt/lammps/examples/reaxff/HNS \ No newline at end of file diff --git a/internal/algorithm/algorithm.go b/pkg/algorithm/algorithm.go similarity index 89% rename from internal/algorithm/algorithm.go rename to pkg/algorithm/algorithm.go index b00ffd3..8fc171b 100644 --- a/internal/algorithm/algorithm.go +++ b/pkg/algorithm/algorithm.go @@ -10,7 +10,8 @@ import ( // A lookup of registered algorithms by name var ( - Algorithms = map[string]AlgorithmInterface{} + Algorithms = map[string]AlgorithmInterface{} + SubmitAction = "submit" ) // An algorithm interface determines behavior for scaling and termination. @@ -37,10 +38,16 @@ type AlgorithmDecision struct { Scale int32 `json:"scale"` // Terminate the member - Terminate bool `terminate:"scale"` + Terminate bool `json:"terminate"` // Send payload back to gRPC sidecar service Payload string `json:"payload"` + + // Action to ask the queue to take + Action string `json:"action"` + + // Update determines if the spec was updated (warranting a patch) + Updated bool `json:"updated"` } // AlgorithmOptions allow packaging named values of different types diff --git a/internal/client/client.go b/pkg/client/client.go similarity index 68% rename from internal/client/client.go rename to pkg/client/client.go index 0b6a992..6724604 100644 --- a/internal/client/client.go +++ b/pkg/client/client.go @@ -26,7 +26,8 @@ var _ Client = (*EnsembleClient)(nil) type Client interface { // Ensemble interactions - RequestStatus(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) + RequestStatus(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (*pb.Response, error) + RequestAction(ctx context.Context, in *pb.ActionRequest, opts ...grpc.CallOption) (*pb.Response, error) } // NewClient creates a new EnsembleClient @@ -69,23 +70,40 @@ func (c *EnsembleClient) GetHost() string { return c.host } -// SubmitJob submits a job to a named cluster. -// The token specific to the cluster is required -func (c *EnsembleClient) RequestStatus(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) { +// RequestStatus gets the queue and jobs status. +// This is primarily for scaling/termination +func (c *EnsembleClient) RequestStatus( + ctx context.Context, + in *pb.StatusRequest, + opts ...grpc.CallOption, +) (*pb.Response, error) { - response := &pb.StatusResponse{} + response := &pb.Response{} if !c.Connected() { return response, errors.New("client is not connected") } + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + response, err := c.service.RequestStatus(ctx, in) + fmt.Println(response) + return response, err +} + +func (c *EnsembleClient) RequestAction( + ctx context.Context, + in *pb.ActionRequest, + opts ...grpc.CallOption, +) (*pb.Response, error) { - // Now contact the rainbow server with clusters... + response := &pb.Response{} + if !c.Connected() { + return response, errors.New("client is not connected") + } ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - // Validate that the cluster exists, and we have the right token. - // The response is the same either way - not found does not reveal - // additional information to the client trying to find it - response, err := c.service.RequestStatus(ctx, &pb.StatusRequest{}) + response, err := c.service.RequestAction(ctx, in) fmt.Println(response) return response, err } diff --git a/internal/types/types.go b/pkg/types/types.go similarity index 100% rename from internal/types/types.go rename to pkg/types/types.go diff --git a/protos/ensemble-service.pb.go b/protos/ensemble-service.pb.go index acc704c..838bca6 100644 --- a/protos/ensemble-service.pb.go +++ b/protos/ensemble-service.pb.go @@ -21,26 +21,26 @@ const ( ) // Registration statuses -type StatusResponse_ResultType int32 +type Response_ResultType int32 const ( - StatusResponse_UNSPECIFIED StatusResponse_ResultType = 0 - StatusResponse_SUCCESS StatusResponse_ResultType = 1 - StatusResponse_ERROR StatusResponse_ResultType = 2 - StatusResponse_DENIED StatusResponse_ResultType = 3 - StatusResponse_EXISTS StatusResponse_ResultType = 4 + Response_UNSPECIFIED Response_ResultType = 0 + Response_SUCCESS Response_ResultType = 1 + Response_ERROR Response_ResultType = 2 + Response_DENIED Response_ResultType = 3 + Response_EXISTS Response_ResultType = 4 ) -// Enum value maps for StatusResponse_ResultType. +// Enum value maps for Response_ResultType. var ( - StatusResponse_ResultType_name = map[int32]string{ + Response_ResultType_name = map[int32]string{ 0: "UNSPECIFIED", 1: "SUCCESS", 2: "ERROR", 3: "DENIED", 4: "EXISTS", } - StatusResponse_ResultType_value = map[string]int32{ + Response_ResultType_value = map[string]int32{ "UNSPECIFIED": 0, "SUCCESS": 1, "ERROR": 2, @@ -49,31 +49,31 @@ var ( } ) -func (x StatusResponse_ResultType) Enum() *StatusResponse_ResultType { - p := new(StatusResponse_ResultType) +func (x Response_ResultType) Enum() *Response_ResultType { + p := new(Response_ResultType) *p = x return p } -func (x StatusResponse_ResultType) String() string { +func (x Response_ResultType) String() string { return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) } -func (StatusResponse_ResultType) Descriptor() protoreflect.EnumDescriptor { +func (Response_ResultType) Descriptor() protoreflect.EnumDescriptor { return file_protos_ensemble_service_proto_enumTypes[0].Descriptor() } -func (StatusResponse_ResultType) Type() protoreflect.EnumType { +func (Response_ResultType) Type() protoreflect.EnumType { return &file_protos_ensemble_service_proto_enumTypes[0] } -func (x StatusResponse_ResultType) Number() protoreflect.EnumNumber { +func (x Response_ResultType) Number() protoreflect.EnumNumber { return protoreflect.EnumNumber(x) } -// Deprecated: Use StatusResponse_ResultType.Descriptor instead. -func (StatusResponse_ResultType) EnumDescriptor() ([]byte, []int) { - return file_protos_ensemble_service_proto_rawDescGZIP(), []int{1, 0} +// Deprecated: Use Response_ResultType.Descriptor instead. +func (Response_ResultType) EnumDescriptor() ([]byte, []int) { + return file_protos_ensemble_service_proto_rawDescGZIP(), []int{2, 0} } // StatusRequest asks to see the status of the queue and jobs @@ -83,8 +83,8 @@ type StatusRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Secret string `protobuf:"bytes,2,opt,name=secret,proto3" json:"secret,omitempty"` + // This is the ensemble member type (e.g., minicluster) + Member string `protobuf:"bytes,1,opt,name=member,proto3" json:"member,omitempty"` } func (x *StatusRequest) Reset() { @@ -119,46 +119,111 @@ func (*StatusRequest) Descriptor() ([]byte, []int) { return file_protos_ensemble_service_proto_rawDescGZIP(), []int{0} } -func (x *StatusRequest) GetName() string { +func (x *StatusRequest) GetMember() string { if x != nil { - return x.Name + return x.Member } return "" } -func (x *StatusRequest) GetSecret() string { +// ActionRequest requests an action +type ActionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Member string `protobuf:"bytes,1,opt,name=member,proto3" json:"member,omitempty"` + Algorithm string `protobuf:"bytes,2,opt,name=algorithm,proto3" json:"algorithm,omitempty"` + Action string `protobuf:"bytes,3,opt,name=action,proto3" json:"action,omitempty"` + Payload string `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *ActionRequest) Reset() { + *x = ActionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_ensemble_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ActionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ActionRequest) ProtoMessage() {} + +func (x *ActionRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_ensemble_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ActionRequest.ProtoReflect.Descriptor instead. +func (*ActionRequest) Descriptor() ([]byte, []int) { + return file_protos_ensemble_service_proto_rawDescGZIP(), []int{1} +} + +func (x *ActionRequest) GetMember() string { + if x != nil { + return x.Member + } + return "" +} + +func (x *ActionRequest) GetAlgorithm() string { + if x != nil { + return x.Algorithm + } + return "" +} + +func (x *ActionRequest) GetAction() string { + if x != nil { + return x.Action + } + return "" +} + +func (x *ActionRequest) GetPayload() string { if x != nil { - return x.Secret + return x.Payload } return "" } -type StatusResponse struct { +type Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` - Status StatusResponse_ResultType `protobuf:"varint,4,opt,name=status,proto3,enum=convergedcomputing.org.grpc.v1.StatusResponse_ResultType" json:"status,omitempty"` + Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Status Response_ResultType `protobuf:"varint,4,opt,name=status,proto3,enum=convergedcomputing.org.grpc.v1.Response_ResultType" json:"status,omitempty"` } -func (x *StatusResponse) Reset() { - *x = StatusResponse{} +func (x *Response) Reset() { + *x = Response{} if protoimpl.UnsafeEnabled { - mi := &file_protos_ensemble_service_proto_msgTypes[1] + mi := &file_protos_ensemble_service_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *StatusResponse) String() string { +func (x *Response) String() string { return protoimpl.X.MessageStringOf(x) } -func (*StatusResponse) ProtoMessage() {} +func (*Response) ProtoMessage() {} -func (x *StatusResponse) ProtoReflect() protoreflect.Message { - mi := &file_protos_ensemble_service_proto_msgTypes[1] +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_protos_ensemble_service_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -169,23 +234,23 @@ func (x *StatusResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead. -func (*StatusResponse) Descriptor() ([]byte, []int) { - return file_protos_ensemble_service_proto_rawDescGZIP(), []int{1} +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_protos_ensemble_service_proto_rawDescGZIP(), []int{2} } -func (x *StatusResponse) GetPayload() string { +func (x *Response) GetPayload() string { if x != nil { return x.Payload } return "" } -func (x *StatusResponse) GetStatus() StatusResponse_ResultType { +func (x *Response) GetStatus() Response_ResultType { if x != nil { return x.Status } - return StatusResponse_UNSPECIFIED + return Response_UNSPECIFIED } var File_protos_ensemble_service_proto protoreflect.FileDescriptor @@ -195,36 +260,47 @@ var file_protos_ensemble_service_proto_rawDesc = []byte{ 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x22, - 0x3b, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x22, 0xcc, 0x01, 0x0a, - 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x51, 0x0a, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x39, 0x2e, 0x63, 0x6f, 0x6e, 0x76, + 0x27, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x77, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x62, 0x65, + 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, + 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, + 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x4b, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x33, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, + 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4d, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, + 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, + 0x01, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, + 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x58, 0x49, 0x53, + 0x54, 0x53, 0x10, 0x04, 0x32, 0xe6, 0x01, 0x0a, 0x10, 0x45, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, + 0x65, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x68, 0x0a, 0x0d, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, + 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, - 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4d, 0x0a, 0x0a, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, - 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, - 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, - 0x52, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x12, - 0x0a, 0x0a, 0x06, 0x45, 0x58, 0x49, 0x53, 0x54, 0x53, 0x10, 0x04, 0x32, 0x82, 0x01, 0x0a, 0x10, - 0x45, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, - 0x12, 0x6e, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, - 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, - 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x2e, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, - 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, - 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, - 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, - 0x6e, 0x67, 0x2f, 0x65, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x2d, 0x6f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x68, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, + 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, + 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, + 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x39, 0x5a, + 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2f, + 0x65, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x2d, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, + 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -240,18 +316,21 @@ func file_protos_ensemble_service_proto_rawDescGZIP() []byte { } var file_protos_ensemble_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_protos_ensemble_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_protos_ensemble_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_protos_ensemble_service_proto_goTypes = []interface{}{ - (StatusResponse_ResultType)(0), // 0: convergedcomputing.org.grpc.v1.StatusResponse.ResultType - (*StatusRequest)(nil), // 1: convergedcomputing.org.grpc.v1.StatusRequest - (*StatusResponse)(nil), // 2: convergedcomputing.org.grpc.v1.StatusResponse + (Response_ResultType)(0), // 0: convergedcomputing.org.grpc.v1.Response.ResultType + (*StatusRequest)(nil), // 1: convergedcomputing.org.grpc.v1.StatusRequest + (*ActionRequest)(nil), // 2: convergedcomputing.org.grpc.v1.ActionRequest + (*Response)(nil), // 3: convergedcomputing.org.grpc.v1.Response } var file_protos_ensemble_service_proto_depIdxs = []int32{ - 0, // 0: convergedcomputing.org.grpc.v1.StatusResponse.status:type_name -> convergedcomputing.org.grpc.v1.StatusResponse.ResultType + 0, // 0: convergedcomputing.org.grpc.v1.Response.status:type_name -> convergedcomputing.org.grpc.v1.Response.ResultType 1, // 1: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestStatus:input_type -> convergedcomputing.org.grpc.v1.StatusRequest - 2, // 2: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestStatus:output_type -> convergedcomputing.org.grpc.v1.StatusResponse - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type + 2, // 2: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestAction:input_type -> convergedcomputing.org.grpc.v1.ActionRequest + 3, // 3: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestStatus:output_type -> convergedcomputing.org.grpc.v1.Response + 3, // 4: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestAction:output_type -> convergedcomputing.org.grpc.v1.Response + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name @@ -276,7 +355,19 @@ func file_protos_ensemble_service_proto_init() { } } file_protos_ensemble_service_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StatusResponse); i { + switch v := v.(*ActionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_ensemble_service_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { case 0: return &v.state case 1: @@ -294,7 +385,7 @@ func file_protos_ensemble_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_protos_ensemble_service_proto_rawDesc, NumEnums: 1, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/protos/ensemble-service.proto b/protos/ensemble-service.proto index 3fdbea3..9f0a191 100644 --- a/protos/ensemble-service.proto +++ b/protos/ensemble-service.proto @@ -4,17 +4,28 @@ package convergedcomputing.org.grpc.v1; option go_package = "github.com/converged-computing/ensemble-operator/protos"; service EnsembleOperator { - rpc RequestStatus(StatusRequest) returns (StatusResponse); + rpc RequestStatus(StatusRequest) returns (Response); + rpc RequestAction(ActionRequest) returns (Response); } // StatusRequest asks to see the status of the queue and jobs // TODO add auth here... message StatusRequest { - string name = 1; - string secret = 2; + + // This is the ensemble member type (e.g., minicluster) + string member = 1; +} + +// ActionRequest requests an action +message ActionRequest { + string member = 1; + string algorithm = 2; + string action = 3; + string payload = 4; } - message StatusResponse { + + message Response { // Registration statuses enum ResultType { diff --git a/protos/ensemble-service_grpc.pb.go b/protos/ensemble-service_grpc.pb.go index 71d1e4d..7d819ac 100644 --- a/protos/ensemble-service_grpc.pb.go +++ b/protos/ensemble-service_grpc.pb.go @@ -22,7 +22,8 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type EnsembleOperatorClient interface { - RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*Response, error) + RequestAction(ctx context.Context, in *ActionRequest, opts ...grpc.CallOption) (*Response, error) } type ensembleOperatorClient struct { @@ -33,8 +34,8 @@ func NewEnsembleOperatorClient(cc grpc.ClientConnInterface) EnsembleOperatorClie return &ensembleOperatorClient{cc} } -func (c *ensembleOperatorClient) RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { - out := new(StatusResponse) +func (c *ensembleOperatorClient) RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) err := c.cc.Invoke(ctx, "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestStatus", in, out, opts...) if err != nil { return nil, err @@ -42,11 +43,21 @@ func (c *ensembleOperatorClient) RequestStatus(ctx context.Context, in *StatusRe return out, nil } +func (c *ensembleOperatorClient) RequestAction(ctx context.Context, in *ActionRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // EnsembleOperatorServer is the server API for EnsembleOperator service. // All implementations must embed UnimplementedEnsembleOperatorServer // for forward compatibility type EnsembleOperatorServer interface { - RequestStatus(context.Context, *StatusRequest) (*StatusResponse, error) + RequestStatus(context.Context, *StatusRequest) (*Response, error) + RequestAction(context.Context, *ActionRequest) (*Response, error) mustEmbedUnimplementedEnsembleOperatorServer() } @@ -54,9 +65,12 @@ type EnsembleOperatorServer interface { type UnimplementedEnsembleOperatorServer struct { } -func (UnimplementedEnsembleOperatorServer) RequestStatus(context.Context, *StatusRequest) (*StatusResponse, error) { +func (UnimplementedEnsembleOperatorServer) RequestStatus(context.Context, *StatusRequest) (*Response, error) { return nil, status.Errorf(codes.Unimplemented, "method RequestStatus not implemented") } +func (UnimplementedEnsembleOperatorServer) RequestAction(context.Context, *ActionRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method RequestAction not implemented") +} func (UnimplementedEnsembleOperatorServer) mustEmbedUnimplementedEnsembleOperatorServer() {} // UnsafeEnsembleOperatorServer may be embedded to opt out of forward compatibility for this service. @@ -88,6 +102,24 @@ func _EnsembleOperator_RequestStatus_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _EnsembleOperator_RequestAction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ActionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EnsembleOperatorServer).RequestAction(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EnsembleOperatorServer).RequestAction(ctx, req.(*ActionRequest)) + } + return interceptor(ctx, in, info, handler) +} + // EnsembleOperator_ServiceDesc is the grpc.ServiceDesc for EnsembleOperator service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -99,6 +131,10 @@ var EnsembleOperator_ServiceDesc = grpc.ServiceDesc{ MethodName: "RequestStatus", Handler: _EnsembleOperator_RequestStatus_Handler, }, + { + MethodName: "RequestAction", + Handler: _EnsembleOperator_RequestAction_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "protos/ensemble-service.proto", diff --git a/python/ensemble_operator/algorithm/__init__.py b/python/ensemble_operator/algorithm/__init__.py new file mode 100644 index 0000000..c3cb988 --- /dev/null +++ b/python/ensemble_operator/algorithm/__init__.py @@ -0,0 +1,12 @@ +from .workload_demand import WorkloadDemand + + +def get_algorithm(name, options=None): + """ + Get a named backend. + """ + options = options or {} + name = name.lower() + if name == "workload-demand": + return WorkloadDemand(**options) + raise ValueError(f"Algorithm {name} is not known") diff --git a/python/ensemble_operator/algorithm/base.py b/python/ensemble_operator/algorithm/base.py new file mode 100644 index 0000000..77a37e3 --- /dev/null +++ b/python/ensemble_operator/algorithm/base.py @@ -0,0 +1,21 @@ +# An algorithm is a named class that can receive the payload from +# a StatusRequest, and optionally act on the queue (with the action) +# depending on the algorithm in question. + + +class AlgorithmBase: + """ + The AlgorithmBase is an abstract base to show functions defined. + """ + + def submit(self, *args, **kwargs): + """ + The submit action will submit one or more jobs. + """ + raise NotImplementedError + + def run(self, *args, **kwargs): + """ + Receive the payload from the Ensemble Operator and take action. + """ + raise NotImplementedError diff --git a/python/ensemble_operator/algorithm/workload_demand.py b/python/ensemble_operator/algorithm/workload_demand.py new file mode 100644 index 0000000..7beaddb --- /dev/null +++ b/python/ensemble_operator/algorithm/workload_demand.py @@ -0,0 +1,16 @@ +import json + +from .base import AlgorithmBase + +# The memory database backend provides an interface to interact with an in memory cluster database + + +class WorkloadDemand(AlgorithmBase): + """ + The WorkloadDemand algorithm + + This can include other ML, etc. calculations for determining + how to assist an action, haven't thought through this yet + """ + + pass diff --git a/python/ensemble_operator/members/__init__.py b/python/ensemble_operator/members/__init__.py new file mode 100644 index 0000000..679dc29 --- /dev/null +++ b/python/ensemble_operator/members/__init__.py @@ -0,0 +1,12 @@ +from .minicluster import MiniClusterMember + + +def get_member(name, options=None): + """ + Get a named member type + """ + options = options or {} + name = name.lower() + if name == "minicluster": + return MiniClusterMember(**options) + raise ValueError(f"Member type {name} is not known") diff --git a/python/ensemble_operator/members/base.py b/python/ensemble_operator/members/base.py new file mode 100644 index 0000000..06ee043 --- /dev/null +++ b/python/ensemble_operator/members/base.py @@ -0,0 +1,18 @@ +class MemberBase: + """ + The MemberBase is an abstract base to show functions defined. + """ + + def __init__(self, **options): + """ + Create a new member type (e.g., minicluster) + """ + # Set options as attributes + for key, value in options.items(): + setattr(self, key, value) + + def status(self, *args, **kwargs): + """ + Ask the member for a status + """ + raise NotImplementedError diff --git a/python/ensemble_operator/members/minicluster/__init__.py b/python/ensemble_operator/members/minicluster/__init__.py new file mode 100644 index 0000000..1bc9135 --- /dev/null +++ b/python/ensemble_operator/members/minicluster/__init__.py @@ -0,0 +1,52 @@ +import json +import shlex + +from ensemble_operator.members.base import MemberBase + + +class MiniClusterMember(MemberBase): + """ + The MiniCluster member type + + Asking a MiniCluster member for a status means asking the flux queue. + """ + + def __init__(self): + """ + Create a new minicluster handle + """ + import flux + + self.handle = flux.Flux() + + def status(self, payload): + """ + Ask the flux queue (metrics) for a status + """ + import ensemble_operator.members.minicluster.metrics as metrics + + # Prepare a payload to send back + payload = {} + + # The payload is the metrics listing + for name, func in metrics.metrics.items(): + payload[name] = func() + + return payload + + def submit(self, payload): + """ + Receive the flux handle and StatusRequest payload to act on. + """ + print(payload) + payload = json.loads(payload) + + # Allow this to fail - it will raise a value error that will propogate back to the operator + import flux.job + + for job in payload.get("jobs") or []: + print(job) + command = shlex.split(job["command"]) + jobspec = flux.job.JobspecV1.from_command(command=command, num_nodes=job["nodes"]) + jobid = flux.job.submit(self.handle, jobspec) + print(f"Submit job {command}: {jobid}") diff --git a/python/ensemble_operator/metrics.py b/python/ensemble_operator/members/minicluster/metrics.py similarity index 100% rename from python/ensemble_operator/metrics.py rename to python/ensemble_operator/members/minicluster/metrics.py diff --git a/python/ensemble_operator/protos/ensemble_service_pb2.py b/python/ensemble_operator/protos/ensemble_service_pb2.py index 15c27b3..ade7db9 100644 --- a/python/ensemble_operator/protos/ensemble_service_pb2.py +++ b/python/ensemble_operator/protos/ensemble_service_pb2.py @@ -14,7 +14,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x16\x65nsemble-service.proto\x12\x1e\x63onvergedcomputing.org.grpc.v1"-\n\rStatusRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06secret\x18\x02 \x01(\t"\xbb\x01\n\x0eStatusResponse\x12\x0f\n\x07payload\x18\x01 \x01(\t\x12I\n\x06status\x18\x04 \x01(\x0e\x32\x39.convergedcomputing.org.grpc.v1.StatusResponse.ResultType"M\n\nResultType\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\n\n\x06\x44\x45NIED\x10\x03\x12\n\n\x06\x45XISTS\x10\x04\x32\x82\x01\n\x10\x45nsembleOperator\x12n\n\rRequestStatus\x12-.convergedcomputing.org.grpc.v1.StatusRequest\x1a..convergedcomputing.org.grpc.v1.StatusResponseB9Z7github.com/converged-computing/ensemble-operator/protosb\x06proto3' + b'\n\x16\x65nsemble-service.proto\x12\x1e\x63onvergedcomputing.org.grpc.v1"\x1f\n\rStatusRequest\x12\x0e\n\x06member\x18\x01 \x01(\t"S\n\rActionRequest\x12\x0e\n\x06member\x18\x01 \x01(\t\x12\x11\n\talgorithm\x18\x02 \x01(\t\x12\x0e\n\x06\x61\x63tion\x18\x03 \x01(\t\x12\x0f\n\x07payload\x18\x04 \x01(\t"\xaf\x01\n\x08Response\x12\x0f\n\x07payload\x18\x01 \x01(\t\x12\x43\n\x06status\x18\x04 \x01(\x0e\x32\x33.convergedcomputing.org.grpc.v1.Response.ResultType"M\n\nResultType\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\n\n\x06\x44\x45NIED\x10\x03\x12\n\n\x06\x45XISTS\x10\x04\x32\xe6\x01\n\x10\x45nsembleOperator\x12h\n\rRequestStatus\x12-.convergedcomputing.org.grpc.v1.StatusRequest\x1a(.convergedcomputing.org.grpc.v1.Response\x12h\n\rRequestAction\x12-.convergedcomputing.org.grpc.v1.ActionRequest\x1a(.convergedcomputing.org.grpc.v1.ResponseB9Z7github.com/converged-computing/ensemble-operator/protosb\x06proto3' ) _globals = globals() @@ -26,11 +26,13 @@ "DESCRIPTOR" ]._serialized_options = b"Z7github.com/converged-computing/ensemble-operator/protos" _globals["_STATUSREQUEST"]._serialized_start = 58 - _globals["_STATUSREQUEST"]._serialized_end = 103 - _globals["_STATUSRESPONSE"]._serialized_start = 106 - _globals["_STATUSRESPONSE"]._serialized_end = 293 - _globals["_STATUSRESPONSE_RESULTTYPE"]._serialized_start = 216 - _globals["_STATUSRESPONSE_RESULTTYPE"]._serialized_end = 293 - _globals["_ENSEMBLEOPERATOR"]._serialized_start = 296 - _globals["_ENSEMBLEOPERATOR"]._serialized_end = 426 + _globals["_STATUSREQUEST"]._serialized_end = 89 + _globals["_ACTIONREQUEST"]._serialized_start = 91 + _globals["_ACTIONREQUEST"]._serialized_end = 174 + _globals["_RESPONSE"]._serialized_start = 177 + _globals["_RESPONSE"]._serialized_end = 352 + _globals["_RESPONSE_RESULTTYPE"]._serialized_start = 275 + _globals["_RESPONSE_RESULTTYPE"]._serialized_end = 352 + _globals["_ENSEMBLEOPERATOR"]._serialized_start = 355 + _globals["_ENSEMBLEOPERATOR"]._serialized_end = 585 # @@protoc_insertion_point(module_scope) diff --git a/python/ensemble_operator/protos/ensemble_service_pb2.pyi b/python/ensemble_operator/protos/ensemble_service_pb2.pyi index ada39bb..74cef42 100644 --- a/python/ensemble_operator/protos/ensemble_service_pb2.pyi +++ b/python/ensemble_operator/protos/ensemble_service_pb2.pyi @@ -6,29 +6,39 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor class StatusRequest(_message.Message): - __slots__ = ("name", "secret") - NAME_FIELD_NUMBER: _ClassVar[int] - SECRET_FIELD_NUMBER: _ClassVar[int] - name: str - secret: str - def __init__(self, name: _Optional[str] = ..., secret: _Optional[str] = ...) -> None: ... + __slots__ = ("member",) + MEMBER_FIELD_NUMBER: _ClassVar[int] + member: str + def __init__(self, member: _Optional[str] = ...) -> None: ... -class StatusResponse(_message.Message): +class ActionRequest(_message.Message): + __slots__ = ("member", "algorithm", "action", "payload") + MEMBER_FIELD_NUMBER: _ClassVar[int] + ALGORITHM_FIELD_NUMBER: _ClassVar[int] + ACTION_FIELD_NUMBER: _ClassVar[int] + PAYLOAD_FIELD_NUMBER: _ClassVar[int] + member: str + algorithm: str + action: str + payload: str + def __init__(self, member: _Optional[str] = ..., algorithm: _Optional[str] = ..., action: _Optional[str] = ..., payload: _Optional[str] = ...) -> None: ... + +class Response(_message.Message): __slots__ = ("payload", "status") class ResultType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () - UNSPECIFIED: _ClassVar[StatusResponse.ResultType] - SUCCESS: _ClassVar[StatusResponse.ResultType] - ERROR: _ClassVar[StatusResponse.ResultType] - DENIED: _ClassVar[StatusResponse.ResultType] - EXISTS: _ClassVar[StatusResponse.ResultType] - UNSPECIFIED: StatusResponse.ResultType - SUCCESS: StatusResponse.ResultType - ERROR: StatusResponse.ResultType - DENIED: StatusResponse.ResultType - EXISTS: StatusResponse.ResultType + UNSPECIFIED: _ClassVar[Response.ResultType] + SUCCESS: _ClassVar[Response.ResultType] + ERROR: _ClassVar[Response.ResultType] + DENIED: _ClassVar[Response.ResultType] + EXISTS: _ClassVar[Response.ResultType] + UNSPECIFIED: Response.ResultType + SUCCESS: Response.ResultType + ERROR: Response.ResultType + DENIED: Response.ResultType + EXISTS: Response.ResultType PAYLOAD_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] payload: str - status: StatusResponse.ResultType - def __init__(self, payload: _Optional[str] = ..., status: _Optional[_Union[StatusResponse.ResultType, str]] = ...) -> None: ... + status: Response.ResultType + def __init__(self, payload: _Optional[str] = ..., status: _Optional[_Union[Response.ResultType, str]] = ...) -> None: ... diff --git a/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py b/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py index 781338f..b618604 100644 --- a/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py +++ b/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py @@ -17,7 +17,12 @@ def __init__(self, channel): self.RequestStatus = channel.unary_unary( "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestStatus", request_serializer=ensemble__service__pb2.StatusRequest.SerializeToString, - response_deserializer=ensemble__service__pb2.StatusResponse.FromString, + response_deserializer=ensemble__service__pb2.Response.FromString, + ) + self.RequestAction = channel.unary_unary( + "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", + request_serializer=ensemble__service__pb2.ActionRequest.SerializeToString, + response_deserializer=ensemble__service__pb2.Response.FromString, ) @@ -30,13 +35,24 @@ def RequestStatus(self, request, context): context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def RequestAction(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def add_EnsembleOperatorServicer_to_server(servicer, server): rpc_method_handlers = { "RequestStatus": grpc.unary_unary_rpc_method_handler( servicer.RequestStatus, request_deserializer=ensemble__service__pb2.StatusRequest.FromString, - response_serializer=ensemble__service__pb2.StatusResponse.SerializeToString, + response_serializer=ensemble__service__pb2.Response.SerializeToString, + ), + "RequestAction": grpc.unary_unary_rpc_method_handler( + servicer.RequestAction, + request_deserializer=ensemble__service__pb2.ActionRequest.FromString, + response_serializer=ensemble__service__pb2.Response.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -67,7 +83,36 @@ def RequestStatus( target, "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestStatus", ensemble__service__pb2.StatusRequest.SerializeToString, - ensemble__service__pb2.StatusResponse.FromString, + ensemble__service__pb2.Response.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def RequestAction( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", + ensemble__service__pb2.ActionRequest.SerializeToString, + ensemble__service__pb2.Response.FromString, options, channel_credentials, insecure, diff --git a/python/ensemble_operator/server.py b/python/ensemble_operator/server.py index 2945655..4c3a2a6 100644 --- a/python/ensemble_operator/server.py +++ b/python/ensemble_operator/server.py @@ -6,10 +6,17 @@ import grpc +import ensemble_operator.algorithm as algorithms import ensemble_operator.defaults as defaults +import ensemble_operator.members as members from ensemble_operator.protos import ensemble_service_pb2 from ensemble_operator.protos import ensemble_service_pb2_grpc as api +# IMPORTANT: this only works with global variables if we have one ensemble gRPC per pod. +# We are anticipating storing some state here with the ensemble mamber since the +# operator should not be doing that. +cache = {} + def get_parser(): parser = argparse.ArgumentParser( @@ -50,33 +57,51 @@ class EnsembleEndpoint(api.EnsembleOperatorServicer): """ def RequestStatus(self, request, context): - """Request information about queues and jobs.""" - print(request) + """ + Request information about queues and jobs. + """ print(context) + print(f"Member type: {request.member}") + + # This will raise an error if the member type (e.g., minicluster) is not known + member = members.get_member(request.member) # If the flux handle didn't work, this might error try: - import ensemble_operator.metrics as metrics + payload = member.status() except: - return ensemble_service_pb2.StatusResponse( - status=ensemble_service_pb2.StatusResponse.ResultType.ERROR + return ensemble_service_pb2.Response( + status=ensemble_service_pb2.Response.ResultType.ERROR ) - # Prepare a payload to send back - payload = {} - - # The payload is the metrics listing - for name, func in metrics.metrics.items(): - payload[name] = func() - - print(json.dumps(payload, indent=4)) - # context.set_code(grpc.StatusCode.UNIMPLEMENTED) - # context.set_details('Method not implemented!') - return ensemble_service_pb2.StatusResponse( + print(json.dumps(payload)) + return ensemble_service_pb2.Response( payload=json.dumps(payload), - status=ensemble_service_pb2.StatusResponse.ResultType.SUCCESS, + status=ensemble_service_pb2.Response.ResultType.SUCCESS, ) + def RequestAction(self, request, context): + """ + Request an action is performed according to an algorithm. + """ + print(f"Algorithm {request.algorithm}") + print(f"Action {request.action}") + print(f"Payload {request.payload}") + + # Assume successful response + status = ensemble_service_pb2.Response.ResultType.SUCCESS + + # TODO retrieve the algorithm to process the request + # We aren't using this or doing that yet, we are just submitting jobs + alg = algorithms.get_algorithm(request.algorithm) + if request.action == "submit": + try: + alg.submit(request.payload) + except Exception as e: + status = ensemble_service_pb2.Response.ResultType.ERROR + + return ensemble_service_pb2.Response(status=status) + def serve(port, workers): """ diff --git a/python/script/build-docker.sh b/python/script/build-docker.sh new file mode 100755 index 0000000..ff2e440 --- /dev/null +++ b/python/script/build-docker.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +# Helper script to build docker images + +docker build -t ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test -f docker/Dockerfile.rocky . +docker push ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test diff --git a/python/setup.cfg b/python/setup.cfg index cd9f494..d59994d 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -3,4 +3,4 @@ exclude = docs max-line-length = 100 ignore = E1 E2 E5 W5 per-file-ignores = - rainbow/__init__.py:F401 + ensemble_operator/__init__.py:F401