diff --git a/Dockerfile b/Dockerfile index 03f79c0..aed5abc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,8 +14,9 @@ RUN go mod download # Copy the go source COPY cmd/main.go cmd/main.go COPY api/ api/ +COPY pkg ./pkg COPY algorithm ./algorithm -COPY internal/ ./internal +COPY controllers/ ./controllers COPY ./protos protos # Build diff --git a/Makefile b/Makefile index 8a75638..63eb1b9 100644 --- a/Makefile +++ b/Makefile @@ -72,7 +72,6 @@ endif python: python ## Generate python proto files in python # pip install grpcio-tools # pip freeze | grep grpcio-tools - # We will put rainbow plus the memory protos here mkdir -p python/ensemble_operator/protos cd python/ensemble_operator/protos python -m grpc_tools.protoc -I./protos --python_out=./python/ensemble_operator/protos --pyi_out=./python/ensemble_operator/protos --grpc_python_out=./python/ensemble_operator/protos ./protos/ensemble-service.proto diff --git a/algorithm/workload/demand/demand.go b/algorithm/workload/demand/demand.go index 3877713..7c4971c 100644 --- a/algorithm/workload/demand/demand.go +++ b/algorithm/workload/demand/demand.go @@ -1,10 +1,11 @@ package demand import ( + "encoding/json" "fmt" api "github.com/converged-computing/ensemble-operator/api/v1alpha1" - "github.com/converged-computing/ensemble-operator/internal/algorithm" + "github.com/converged-computing/ensemble-operator/pkg/algorithm" "k8s.io/utils/set" ) @@ -50,12 +51,34 @@ func (e WorkloadDemand) MakeDecision( ) (algorithm.AlgorithmDecision, error) { fmt.Println(member.Jobs) + + // This will determine if we need to patch the crd decision := algorithm.AlgorithmDecision{} - //file, err := os.ReadFile(yamlFile) - //if err != nil { - // return &js, err - //} + updated := false + + // For the algorithm here, we always submit all jobs that still have counts. + // We don't consider the queue status (payload) here because it does not matter + // This is a lookup + req := SubmitRequest{Jobs: []Job{}} + for _, job := range member.Jobs { + if job.Count > 0 { + req.AddJob(job.Name, job.Command, job.Count, job.Nodes) + job.Count = 0 + updated = true + } + } + // Serialize the json into a payload + response, err := json.Marshal(&req) + if err != nil { + return decision, err + } + decision = algorithm.AlgorithmDecision{Updated: updated, Payload: string(response)} + + // If we have updates, ask the queue to submit them + if updated { + decision.Action = algorithm.SubmitAction + } return decision, nil } @@ -64,7 +87,6 @@ func (e WorkloadDemand) Validate(options algorithm.AlgorithmOptions) bool { return true } -// Add the selection algorithm to be known to rainbow func init() { a := WorkloadDemand{} algorithm.Register(a) diff --git a/algorithm/workload/demand/types.go b/algorithm/workload/demand/types.go new file mode 100644 index 0000000..71e1f2e --- /dev/null +++ b/algorithm/workload/demand/types.go @@ -0,0 +1,18 @@ +package demand + +// A SubmitRequest includes jobs and counts to submit. +// It will be serialized into the json payload to the gRPC sidecar +type SubmitRequest struct { + Jobs []Job `json:"jobs,omitempty"` +} + +type Job struct { + Name string `json:"name,omitempty"` + Command string `json:"command,omitempty"` + Count int32 `json:"count,omitempty"` + Nodes int32 `json:"nodes,omitempty"` +} + +func (r *SubmitRequest) AddJob(name, command string, count, nodes int32) { + r.Jobs = append(r.Jobs, Job{Name: name, Command: command, Count: count, Nodes: nodes}) +} diff --git a/api/v1alpha1/ensemble_types.go b/api/v1alpha1/ensemble_types.go index 3ab94e7..54a6ae3 100644 --- a/api/v1alpha1/ensemble_types.go +++ b/api/v1alpha1/ensemble_types.go @@ -26,7 +26,8 @@ import ( ) var ( - defaultSidecarbase = "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" + defaultSidecarbase = "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" + defaultAlgorithmName = "workload-demand" ) // EnsembleSpec defines the desired state of Ensemble @@ -57,33 +58,40 @@ type Member struct { // +optional MiniCluster minicluster.MiniCluster `json:"minicluster,omitempty"` + // Definition and customization of the sidecar + //+optional + Sidecar Sidecar `json:"sidecar,omitempty"` + + // A member is required to define one or more jobs + // Jobs + Jobs []Job `json:"jobs"` + + // Member specific algorithm to use + // If not defined, defaults to workload-demand + //+optional + Algorithm Algorithm `json:"algorithm"` +} + +type Sidecar struct { + // Baseimage for the sidecar that will monitor the queue. // Ensure that the operating systems match! // +kubebuilder:default="ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" // +default="ghcr.io/converged-computing/ensemble-operator-api:rockylinux9" // +optional - SidecarBase string `json:"sidecarBase"` + Image string `json:"image"` // Always pull the sidecar container (useful for development) // +optional - SidecarPullAlways bool `json:"sidecarPullAlways"` + PullAlways bool `json:"pullAlways"` // +kubebuilder:default="50051" // +default="50051" - SidecarPort string `json:"sidecarPort"` + Port string `json:"port"` // +kubebuilder:default=10 // +default=10 - SidecarWorkers int32 `json:"sidecarWorkers"` - - // A member is required to define one or more jobs - // Jobs - Jobs []Job `json:"jobs"` - - // Member specific algorithm to use - // If not defined, defaults to workload-demand - //+optional - Algorithm Algorithm `json:"algorithm"` + Workers int32 `json:"workers"` } type Algorithm struct { @@ -106,9 +114,16 @@ type Job struct { // Number of jobs to run // This can be set to 0 depending on the algorithm // E.g., some algorithms decide on the number to submit + // +kubebuilder:default=1 + // +default=1 //+optional Count int32 `json:"count"` + // +kubebuilder:default=1 + // +default=1 + //+optional + Nodes int32 `json:"nodes"` + // TODO add label here for ML model category } @@ -127,12 +142,23 @@ func (m *Member) Type() string { return "unknown" } +func (e *Ensemble) getDefaultAlgorithm() Algorithm { + defaultAlgorithm := e.Spec.Algorithm + + // No we don't, it's empty + if reflect.DeepEqual(defaultAlgorithm, Algorithm{}) { + defaultAlgorithm = Algorithm{Name: defaultAlgorithmName} + } + return defaultAlgorithm +} + // Validate ensures we have data that is needed, and sets defaults if needed func (e *Ensemble) Validate() error { fmt.Println() // These are the allowed sidecars bases := set.New( + "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test", "ghcr.io/converged-computing/ensemble-operator-api:rockylinux9", "ghcr.io/converged-computing/ensemble-operator-api:rockylinux8", "ghcr.io/converged-computing/ensemble-operator-api:ubuntu-focal", @@ -143,12 +169,7 @@ func (e *Ensemble) Validate() error { fmt.Printf("🤓 Ensemble.members %d\n", len(e.Spec.Members)) // Do we have a default algorithm set? - defaultAlgorithm := e.Spec.Algorithm - - // No we don't, it's empty - if reflect.DeepEqual(defaultAlgorithm, Algorithm{}) { - defaultAlgorithm = Algorithm{Name: "workload-demand"} - } + defaultAlgorithm := e.getDefaultAlgorithm() // If MaxSize is set, it must be greater than size if len(e.Spec.Members) < 1 { @@ -170,14 +191,24 @@ func (e *Ensemble) Validate() error { if len(member.Jobs) == 0 { return fmt.Errorf("ensemble member in index %d must have at least one job definition", i) } + + // Validate jobs matrix + for _, job := range member.Jobs { + if job.Count <= 0 { + job.Count = 1 + } + } + // If we have a minicluster, all three sizes must be defined if !reflect.DeepEqual(member.MiniCluster, minicluster.MiniCluster{}) { - fmt.Println(" Ensemble.member Type: minicluster") - if member.SidecarBase == "" { - member.SidecarBase = defaultSidecarbase + fmt.Println(" Ensemble.member Type: minicluster") + if member.Sidecar.Image == "" { + member.Sidecar.Image = defaultSidecarbase } - fmt.Printf(" Ensemble.member.SidecarBase: %s\n", member.SidecarBase) + fmt.Printf(" Ensemble.member.Sidecar.Image: %s\n", member.Sidecar.Image) + fmt.Printf(" Ensemble.member.Sidecar.Port: %s\n", member.Sidecar.Port) + fmt.Printf(" Ensemble.member.Sidecar.PullAlways: %v\n", member.Sidecar.PullAlways) if member.MiniCluster.Spec.MaxSize <= 0 || member.MiniCluster.Spec.Size <= 0 { return fmt.Errorf("ensemble minicluster must have a size and maxsize of at least 1") @@ -191,7 +222,7 @@ func (e *Ensemble) Validate() error { } // Base container must be in valid set - if !bases.Has(member.SidecarBase) { + if !bases.Has(member.Sidecar.Image) { return fmt.Errorf("base image must be rocky linux or ubuntu: %s", bases) } count += 1 diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 371d7a1..8f1ec3c 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -155,6 +155,7 @@ func (in *Job) DeepCopy() *Job { func (in *Member) DeepCopyInto(out *Member) { *out = *in in.MiniCluster.DeepCopyInto(&out.MiniCluster) + out.Sidecar = in.Sidecar if in.Jobs != nil { in, out := &in.Jobs, &out.Jobs *out = make([]Job, len(*in)) @@ -172,3 +173,18 @@ func (in *Member) DeepCopy() *Member { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Sidecar) DeepCopyInto(out *Sidecar) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Sidecar. +func (in *Sidecar) DeepCopy() *Sidecar { + if in == nil { + return nil + } + out := new(Sidecar) + in.DeepCopyInto(out) + return out +} diff --git a/chart/templates/ensemble-crd.yaml b/chart/templates/ensemble-crd.yaml index 6c36227..9d4af2b 100644 --- a/chart/templates/ensemble-crd.yaml +++ b/chart/templates/ensemble-crd.yaml @@ -74,6 +74,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -82,6 +83,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -956,25 +961,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the queue. - Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor the + queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/cmd/main.go b/cmd/main.go index e5c5f21..2d5e599 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/rest" api "github.com/converged-computing/ensemble-operator/api/v1alpha1" - "github.com/converged-computing/ensemble-operator/internal/controller" + controller "github.com/converged-computing/ensemble-operator/controllers/ensemble" minicluster "github.com/flux-framework/flux-operator/api/v1alpha2" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" diff --git a/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml b/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml index 6c5ce3d..b9ece3f 100644 --- a/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml +++ b/config/crd/bases/ensemble.flux-framework.org_ensembles.yaml @@ -75,6 +75,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -83,6 +84,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -967,25 +972,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/internal/controller/ensemble_controller.go b/controllers/ensemble/ensemble_controller.go similarity index 99% rename from internal/controller/ensemble_controller.go rename to controllers/ensemble/ensemble_controller.go index fe21f8d..4d768e8 100644 --- a/internal/controller/ensemble_controller.go +++ b/controllers/ensemble/ensemble_controller.go @@ -116,7 +116,7 @@ func (r *EnsembleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if !reflect.DeepEqual(member.MiniCluster, minicluster.MiniClusterSpec{}) { name := fmt.Sprintf("%s-%d", ensemble.Name, i) fmt.Printf("Checking member %s\n", name) - result, err := r.updateMiniClusterEnsemble(ctx, name, &ensemble, &member) + result, err := r.updateMiniClusterEnsemble(ctx, name, &ensemble, &member, i) // An error could indicates a requeue (without the break) since something was off // We likely need to tweak this a bit to account for potential updates to specific // ensemble members, but this is fine for a first shot. diff --git a/internal/controller/ensemble_controller_test.go b/controllers/ensemble/ensemble_controller_test.go similarity index 100% rename from internal/controller/ensemble_controller_test.go rename to controllers/ensemble/ensemble_controller_test.go diff --git a/internal/controller/minicluster.go b/controllers/ensemble/minicluster.go similarity index 95% rename from internal/controller/minicluster.go rename to controllers/ensemble/minicluster.go index eaa51b0..3f71149 100644 --- a/internal/controller/minicluster.go +++ b/controllers/ensemble/minicluster.go @@ -101,13 +101,13 @@ func (r *EnsembleReconciler) newMiniCluster( spec.Spec.Interactive = true // Start command for ensemble grpc service - command := fmt.Sprintf(postCommand, member.SidecarPort, member.SidecarWorkers) + command := fmt.Sprintf(postCommand, member.Sidecar.Port, member.Sidecar.Workers) // Create a new container for the flux metrics API to run, this will communicate with our grpc sidecar := minicluster.MiniClusterContainer{ Name: "api", - Image: member.SidecarBase, - PullAlways: member.SidecarPullAlways, + Image: member.Sidecar.Image, + PullAlways: member.Sidecar.PullAlways, Commands: minicluster.Commands{ Post: command, }, diff --git a/internal/controller/suite_test.go b/controllers/ensemble/suite_test.go similarity index 100% rename from internal/controller/suite_test.go rename to controllers/ensemble/suite_test.go diff --git a/internal/controller/templates.go b/controllers/ensemble/templates.go similarity index 100% rename from internal/controller/templates.go rename to controllers/ensemble/templates.go diff --git a/internal/controller/update.go b/controllers/ensemble/update.go similarity index 63% rename from internal/controller/update.go rename to controllers/ensemble/update.go index 3dee361..512c715 100644 --- a/internal/controller/update.go +++ b/controllers/ensemble/update.go @@ -5,13 +5,14 @@ import ( "fmt" api "github.com/converged-computing/ensemble-operator/api/v1alpha1" - "github.com/converged-computing/ensemble-operator/internal/algorithm" - "github.com/converged-computing/ensemble-operator/internal/client" + "github.com/converged-computing/ensemble-operator/pkg/algorithm" + "github.com/converged-computing/ensemble-operator/pkg/client" pb "github.com/converged-computing/ensemble-operator/protos" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" + kclient "sigs.k8s.io/controller-runtime/pkg/client" ) // ensureEnsemble ensures that the ensemle is created! @@ -20,6 +21,7 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble( name string, ensemble *api.Ensemble, member *api.Member, + idx int, ) (ctrl.Result, error) { // The MiniCluster service is being provided by the index 0 pod, so we can find it here. @@ -32,7 +34,7 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble( // A selector just for the lead broker pod of the ensemble MiniCluster labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{ // job-name corresponds to the ensemble name plus index in the list - "job-name": fmt.Sprintf("%s-0", ensemble.Name), + "job-name": name, // job index is the lead broker (0) within "job-index": "0", }} @@ -66,46 +68,75 @@ func (r *EnsembleReconciler) updateMiniClusterEnsemble( } // Create a client to the pod (host) - host := fmt.Sprintf("%s:%s", ipAddress, member.SidecarPort) + host := fmt.Sprintf("%s:%s", ipAddress, member.Sidecar.Port) r.Log.Info("🦀 MiniCluster Ensemble Update", "Host", host) c, err := client.NewClient(host) if err != nil { return ctrl.Result{Requeue: true}, err } - // Get the queue status! - in := pb.StatusRequest{} - response, err := c.RequestStatus(ctx, &in) - if err != nil { - r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with status request", err) - return ctrl.Result{Requeue: true}, err - } // Get the algorithm - if this fails we stop - a, err := algorithm.Get(member.Algorithm.Name) + algo, err := algorithm.Get(member.Algorithm.Name) if err != nil { r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Failed to retrieve algorithm", err) return ctrl.Result{Requeue: true}, err } // check that the algorithm is valid for the member type - // TODO add actual options here - err = a.Check(algorithm.AlgorithmOptions{}, member) + err = algo.Check(algorithm.AlgorithmOptions{}, member) if err != nil { - r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Algorithm %s does not support %s", a.Name(), member.Type()) + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Algorithm %s does not support %s", algo.Name(), member.Type()) return ctrl.Result{}, err } + // The status request comes first to peek at the queue + // TODO add secret here, maybe don't need Name + in := pb.StatusRequest{Member: member.Type()} + response, err := c.RequestStatus(ctx, &in) + if err != nil { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with status request", err) + return ctrl.Result{Requeue: true}, err + } fmt.Println(response.Status) - fmt.Println(response.Payload) - // Make a decision - decision, err := a.MakeDecision(response.Payload, member) + // Make a decision based on the queue + decision, err := algo.MakeDecision(response.Payload, member) if err != nil { r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Decision Error", err) return ctrl.Result{}, err } - fmt.Println(decision) + // If we are requesting an action to the queue (sidecar gRPC) do it + // This second request should be OK because I think it will be infrequent. + // Most algorithms should do submission in bulk (infrequently) and then monitor + if decision.Action != "" { + in := pb.ActionRequest{ + Member: member.Type(), + Algorithm: algo.Name(), + Payload: decision.Payload, + Action: decision.Action, + } + response, err := c.RequestAction(ctx, &in) + if err != nil { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with status request", err) + return ctrl.Result{Requeue: true}, err + } + fmt.Println(response.Status) + } + + // Do we need to update / patch the CRD? For example, if we adjust Jobs, we need to + // Since we requeue anyway, don't need special logic to do it here + if decision.Updated { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client Update!") + patch := kclient.MergeFrom(ensemble.DeepCopy()) + ensemble.Spec.Members[idx] = *member + err := r.Patch(ctx, ensemble, patch) + if err != nil { + r.Log.Info("🦀 MiniCluster Ensemble GRPC Client", "Error with patch", err) + return ctrl.Result{}, nil + } + r.Update(ctx, ensemble) + } // This is the last return, this says to check every N seconds return ctrl.Result{Requeue: true}, nil } diff --git a/examples/algorithms/workload/demand/README.md b/examples/algorithms/workload/demand/README.md index 56d0b56..79012d7 100644 --- a/examples/algorithms/workload/demand/README.md +++ b/examples/algorithms/workload/demand/README.md @@ -1,7 +1,13 @@ # Workload Demand You can read about the workload demand algorithm [here](https://github.com/converged-computing/ensemble-operator/blob/main/docs/algorithms.md#workoad-demand-of-consistent-sizes). -For this example, we assume you have a cluster running (e.g., with kind) and have installed the Flux Operator and Ensemble Operator. +For this example, we assume you have a cluster running (e.g., with kind) and have installed the Flux Operator and Ensemble Operator. Here is what is going to happen: + +1. We define our jobs matrix in the [ensemble.yaml](ensemble.yaml). The jobs matrix is consistent across algorithms. +2. For this algorithm, we will first get a cluster status `StatusRequest`. We don't use it here aside from establishing communication. +3. We then detect that the job matrix has outstanding jobs, and make an `ActionRequest` to "submit" +4. The jobs are submit on the MiniCluster +5. We proceed to monitor, scaling when conditions are met, downsizing when jobs are finishing, and terminating after that. ## Usage @@ -14,12 +20,22 @@ kubectl apply -f ensemble.yaml ``` We can check both the gRPC sidecar and the operator to see if information is being received. Here is the -sidecar: +sidecar (after setup steps): ```bash kubectl logs workload-demand-0-0-vfxxd -c api -f ``` ```console +🥞️ Starting ensemble endpoint at :50051 + +Member type: minicluster +Algorithm workload-demand +Action submit +Payload {"jobs":[{"name":"lammps-2","command":"lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite","count":10,"nodes":1},{"name":"lammps-4","command":"lmp -v x 4 -v y 4 -v z 4 -in in.reaxc.hns -nocite","count":5,"nodes":1}]} + +Member type: minicluster +Algorithm workload-demand +Action submit ``` And here is the operator: @@ -27,12 +43,23 @@ And here is the operator: ```bash kubectl logs -n ensemble-operator-system ensemble-operator-controller-manager-5f874bb7d8-m68jb ``` +```console + => Ensemble.member 0 + Ensemble.member.Algorithm: workload-demand + Ensemble.member Type: minicluster + Ensemble.member.Sidecar.Image: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test + Ensemble.member.Sidecar.Port: 50051 + Ensemble.member.Sidecar.PullAlways: true +Checking member workload-demand-0 +2024-03-24T07:16:09Z INFO Pod {"controller": "ensemble", "controllerGroup": "ensemble.flux-framework.org", "controllerKind": "Ensemble", "Ensemble": {"name":"workload-demand","namespace":"default"}, "namespace": "default", "name": "workload-demand", "reconcileID": "7be3ac2c-c224-4089-b362-27a36fd5298e", "IP Address": "10.244.1.43"} +2024-03-24T07:16:09Z INFO 🦀 MiniCluster Ensemble Update {"controller": "ensemble", "controllerGroup": "ensemble.flux-framework.org", "controllerKind": "Ensemble", "Ensemble": {"name":"workload-demand","namespace":"default"}, "namespace": "default", "name": "workload-demand", "reconcileID": "7be3ac2c-c224-4089-b362-27a36fd5298e", "Host": "10.244.1.43:50051"} +2024/03/24 07:16:09 🥞️ starting client (10.244.1.43:50051)... +``` +WIP - the above is not done yet! ## TODO -- the base containers should have the grpc already built so we don't have to wait -- get a list of commands from the workload-demand decision endpoint, and send them back to the sidecar - - this means we need to have a string payload that can be received - - this also means the algorithms receiving endpoints need to be defined within the python service -- add to the decision logic to indicate a flag for when a spec (CRD) is updated so we do a patch \ No newline at end of file +- Now that we are submitting jobs on start, we need to react to the other decision, scaling the cluster up. +- We also need to then set the terminate action conditions, define the action, and successfully terminate the ensemble member (minicluster) +- after this is done, this first algorithm should be mostly done. \ No newline at end of file diff --git a/examples/algorithms/workload/demand/ensemble.yaml b/examples/algorithms/workload/demand/ensemble.yaml index 15415a6..6a10a08 100644 --- a/examples/algorithms/workload/demand/ensemble.yaml +++ b/examples/algorithms/workload/demand/ensemble.yaml @@ -4,7 +4,9 @@ metadata: name: workload-demand spec: members: - - sidecarPullAlways: true + - sidecar: + pullAlways: true + image: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test algorithm: name: workload-demand jobs: diff --git a/examples/dist/ensemble-operator-arm.yaml b/examples/dist/ensemble-operator-arm.yaml index 064cd69..a51bcd2 100644 --- a/examples/dist/ensemble-operator-arm.yaml +++ b/examples/dist/ensemble-operator-arm.yaml @@ -87,6 +87,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -95,6 +96,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -979,25 +984,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/examples/dist/ensemble-operator-dev.yaml b/examples/dist/ensemble-operator-dev.yaml index f20e07b..c28e44a 100644 --- a/examples/dist/ensemble-operator-dev.yaml +++ b/examples/dist/ensemble-operator-dev.yaml @@ -87,6 +87,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -95,6 +96,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -979,25 +984,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/examples/dist/ensemble-operator.yaml b/examples/dist/ensemble-operator.yaml index 7b7fc9c..060b52f 100644 --- a/examples/dist/ensemble-operator.yaml +++ b/examples/dist/ensemble-operator.yaml @@ -87,6 +87,7 @@ spec: description: Command given to flux type: string count: + default: 1 description: Number of jobs to run This can be set to 0 depending on the algorithm E.g., some algorithms decide on the number to submit @@ -95,6 +96,10 @@ spec: name: description: Name to identify the job group type: string + nodes: + default: 1 + format: int32 + type: integer required: - command - name @@ -979,25 +984,31 @@ spec: - size type: object type: object - sidecarBase: - default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 - description: Baseimage for the sidecar that will monitor the - queue. Ensure that the operating systems match! - type: string - sidecarPort: - default: "50051" - type: string - sidecarPullAlways: - description: Always pull the sidecar container (useful for development) - type: boolean - sidecarWorkers: - default: 10 - format: int32 - type: integer + sidecar: + description: Definition and customization of the sidecar + properties: + image: + default: ghcr.io/converged-computing/ensemble-operator-api:rockylinux9 + description: Baseimage for the sidecar that will monitor + the queue. Ensure that the operating systems match! + type: string + port: + default: "50051" + type: string + pullAlways: + description: Always pull the sidecar container (useful for + development) + type: boolean + workers: + default: 10 + format: int32 + type: integer + required: + - port + - workers + type: object required: - jobs - - sidecarPort - - sidecarWorkers type: object type: array required: diff --git a/examples/tests/lammps/ensemble.yaml b/examples/tests/lammps/ensemble.yaml index 3ac6bd7..70659de 100644 --- a/examples/tests/lammps/ensemble.yaml +++ b/examples/tests/lammps/ensemble.yaml @@ -4,10 +4,15 @@ metadata: name: ensemble-sample spec: members: - - sidecarPullAlways: true + - sidecar: + pullAlways: true + + jobs: + - name: lammps + command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite + minicluster: spec: - size: 2 maxSize: 4 minSize: 2 @@ -18,11 +23,4 @@ spec: - image: ghcr.io/converged-computing/metric-lammps:latest # You can set the working directory if your container WORKDIR is not correct. - workingDir: /opt/lammps/examples/reaxff/HNS - batchRaw: true - batch: true - commands: - post: sleep infinity - command: | - lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite - sleep infinity \ No newline at end of file + workingDir: /opt/lammps/examples/reaxff/HNS \ No newline at end of file diff --git a/internal/algorithm/algorithm.go b/pkg/algorithm/algorithm.go similarity index 89% rename from internal/algorithm/algorithm.go rename to pkg/algorithm/algorithm.go index b00ffd3..8fc171b 100644 --- a/internal/algorithm/algorithm.go +++ b/pkg/algorithm/algorithm.go @@ -10,7 +10,8 @@ import ( // A lookup of registered algorithms by name var ( - Algorithms = map[string]AlgorithmInterface{} + Algorithms = map[string]AlgorithmInterface{} + SubmitAction = "submit" ) // An algorithm interface determines behavior for scaling and termination. @@ -37,10 +38,16 @@ type AlgorithmDecision struct { Scale int32 `json:"scale"` // Terminate the member - Terminate bool `terminate:"scale"` + Terminate bool `json:"terminate"` // Send payload back to gRPC sidecar service Payload string `json:"payload"` + + // Action to ask the queue to take + Action string `json:"action"` + + // Update determines if the spec was updated (warranting a patch) + Updated bool `json:"updated"` } // AlgorithmOptions allow packaging named values of different types diff --git a/internal/client/client.go b/pkg/client/client.go similarity index 68% rename from internal/client/client.go rename to pkg/client/client.go index 0b6a992..6724604 100644 --- a/internal/client/client.go +++ b/pkg/client/client.go @@ -26,7 +26,8 @@ var _ Client = (*EnsembleClient)(nil) type Client interface { // Ensemble interactions - RequestStatus(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) + RequestStatus(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (*pb.Response, error) + RequestAction(ctx context.Context, in *pb.ActionRequest, opts ...grpc.CallOption) (*pb.Response, error) } // NewClient creates a new EnsembleClient @@ -69,23 +70,40 @@ func (c *EnsembleClient) GetHost() string { return c.host } -// SubmitJob submits a job to a named cluster. -// The token specific to the cluster is required -func (c *EnsembleClient) RequestStatus(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (*pb.StatusResponse, error) { +// RequestStatus gets the queue and jobs status. +// This is primarily for scaling/termination +func (c *EnsembleClient) RequestStatus( + ctx context.Context, + in *pb.StatusRequest, + opts ...grpc.CallOption, +) (*pb.Response, error) { - response := &pb.StatusResponse{} + response := &pb.Response{} if !c.Connected() { return response, errors.New("client is not connected") } + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + + response, err := c.service.RequestStatus(ctx, in) + fmt.Println(response) + return response, err +} + +func (c *EnsembleClient) RequestAction( + ctx context.Context, + in *pb.ActionRequest, + opts ...grpc.CallOption, +) (*pb.Response, error) { - // Now contact the rainbow server with clusters... + response := &pb.Response{} + if !c.Connected() { + return response, errors.New("client is not connected") + } ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - // Validate that the cluster exists, and we have the right token. - // The response is the same either way - not found does not reveal - // additional information to the client trying to find it - response, err := c.service.RequestStatus(ctx, &pb.StatusRequest{}) + response, err := c.service.RequestAction(ctx, in) fmt.Println(response) return response, err } diff --git a/internal/types/types.go b/pkg/types/types.go similarity index 100% rename from internal/types/types.go rename to pkg/types/types.go diff --git a/protos/ensemble-service.pb.go b/protos/ensemble-service.pb.go index acc704c..838bca6 100644 --- a/protos/ensemble-service.pb.go +++ b/protos/ensemble-service.pb.go @@ -21,26 +21,26 @@ const ( ) // Registration statuses -type StatusResponse_ResultType int32 +type Response_ResultType int32 const ( - StatusResponse_UNSPECIFIED StatusResponse_ResultType = 0 - StatusResponse_SUCCESS StatusResponse_ResultType = 1 - StatusResponse_ERROR StatusResponse_ResultType = 2 - StatusResponse_DENIED StatusResponse_ResultType = 3 - StatusResponse_EXISTS StatusResponse_ResultType = 4 + Response_UNSPECIFIED Response_ResultType = 0 + Response_SUCCESS Response_ResultType = 1 + Response_ERROR Response_ResultType = 2 + Response_DENIED Response_ResultType = 3 + Response_EXISTS Response_ResultType = 4 ) -// Enum value maps for StatusResponse_ResultType. +// Enum value maps for Response_ResultType. var ( - StatusResponse_ResultType_name = map[int32]string{ + Response_ResultType_name = map[int32]string{ 0: "UNSPECIFIED", 1: "SUCCESS", 2: "ERROR", 3: "DENIED", 4: "EXISTS", } - StatusResponse_ResultType_value = map[string]int32{ + Response_ResultType_value = map[string]int32{ "UNSPECIFIED": 0, "SUCCESS": 1, "ERROR": 2, @@ -49,31 +49,31 @@ var ( } ) -func (x StatusResponse_ResultType) Enum() *StatusResponse_ResultType { - p := new(StatusResponse_ResultType) +func (x Response_ResultType) Enum() *Response_ResultType { + p := new(Response_ResultType) *p = x return p } -func (x StatusResponse_ResultType) String() string { +func (x Response_ResultType) String() string { return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) } -func (StatusResponse_ResultType) Descriptor() protoreflect.EnumDescriptor { +func (Response_ResultType) Descriptor() protoreflect.EnumDescriptor { return file_protos_ensemble_service_proto_enumTypes[0].Descriptor() } -func (StatusResponse_ResultType) Type() protoreflect.EnumType { +func (Response_ResultType) Type() protoreflect.EnumType { return &file_protos_ensemble_service_proto_enumTypes[0] } -func (x StatusResponse_ResultType) Number() protoreflect.EnumNumber { +func (x Response_ResultType) Number() protoreflect.EnumNumber { return protoreflect.EnumNumber(x) } -// Deprecated: Use StatusResponse_ResultType.Descriptor instead. -func (StatusResponse_ResultType) EnumDescriptor() ([]byte, []int) { - return file_protos_ensemble_service_proto_rawDescGZIP(), []int{1, 0} +// Deprecated: Use Response_ResultType.Descriptor instead. +func (Response_ResultType) EnumDescriptor() ([]byte, []int) { + return file_protos_ensemble_service_proto_rawDescGZIP(), []int{2, 0} } // StatusRequest asks to see the status of the queue and jobs @@ -83,8 +83,8 @@ type StatusRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` - Secret string `protobuf:"bytes,2,opt,name=secret,proto3" json:"secret,omitempty"` + // This is the ensemble member type (e.g., minicluster) + Member string `protobuf:"bytes,1,opt,name=member,proto3" json:"member,omitempty"` } func (x *StatusRequest) Reset() { @@ -119,46 +119,111 @@ func (*StatusRequest) Descriptor() ([]byte, []int) { return file_protos_ensemble_service_proto_rawDescGZIP(), []int{0} } -func (x *StatusRequest) GetName() string { +func (x *StatusRequest) GetMember() string { if x != nil { - return x.Name + return x.Member } return "" } -func (x *StatusRequest) GetSecret() string { +// ActionRequest requests an action +type ActionRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Member string `protobuf:"bytes,1,opt,name=member,proto3" json:"member,omitempty"` + Algorithm string `protobuf:"bytes,2,opt,name=algorithm,proto3" json:"algorithm,omitempty"` + Action string `protobuf:"bytes,3,opt,name=action,proto3" json:"action,omitempty"` + Payload string `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *ActionRequest) Reset() { + *x = ActionRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_ensemble_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ActionRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ActionRequest) ProtoMessage() {} + +func (x *ActionRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_ensemble_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ActionRequest.ProtoReflect.Descriptor instead. +func (*ActionRequest) Descriptor() ([]byte, []int) { + return file_protos_ensemble_service_proto_rawDescGZIP(), []int{1} +} + +func (x *ActionRequest) GetMember() string { + if x != nil { + return x.Member + } + return "" +} + +func (x *ActionRequest) GetAlgorithm() string { + if x != nil { + return x.Algorithm + } + return "" +} + +func (x *ActionRequest) GetAction() string { + if x != nil { + return x.Action + } + return "" +} + +func (x *ActionRequest) GetPayload() string { if x != nil { - return x.Secret + return x.Payload } return "" } -type StatusResponse struct { +type Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` - Status StatusResponse_ResultType `protobuf:"varint,4,opt,name=status,proto3,enum=convergedcomputing.org.grpc.v1.StatusResponse_ResultType" json:"status,omitempty"` + Payload string `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Status Response_ResultType `protobuf:"varint,4,opt,name=status,proto3,enum=convergedcomputing.org.grpc.v1.Response_ResultType" json:"status,omitempty"` } -func (x *StatusResponse) Reset() { - *x = StatusResponse{} +func (x *Response) Reset() { + *x = Response{} if protoimpl.UnsafeEnabled { - mi := &file_protos_ensemble_service_proto_msgTypes[1] + mi := &file_protos_ensemble_service_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } } -func (x *StatusResponse) String() string { +func (x *Response) String() string { return protoimpl.X.MessageStringOf(x) } -func (*StatusResponse) ProtoMessage() {} +func (*Response) ProtoMessage() {} -func (x *StatusResponse) ProtoReflect() protoreflect.Message { - mi := &file_protos_ensemble_service_proto_msgTypes[1] +func (x *Response) ProtoReflect() protoreflect.Message { + mi := &file_protos_ensemble_service_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -169,23 +234,23 @@ func (x *StatusResponse) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use StatusResponse.ProtoReflect.Descriptor instead. -func (*StatusResponse) Descriptor() ([]byte, []int) { - return file_protos_ensemble_service_proto_rawDescGZIP(), []int{1} +// Deprecated: Use Response.ProtoReflect.Descriptor instead. +func (*Response) Descriptor() ([]byte, []int) { + return file_protos_ensemble_service_proto_rawDescGZIP(), []int{2} } -func (x *StatusResponse) GetPayload() string { +func (x *Response) GetPayload() string { if x != nil { return x.Payload } return "" } -func (x *StatusResponse) GetStatus() StatusResponse_ResultType { +func (x *Response) GetStatus() Response_ResultType { if x != nil { return x.Status } - return StatusResponse_UNSPECIFIED + return Response_UNSPECIFIED } var File_protos_ensemble_service_proto protoreflect.FileDescriptor @@ -195,36 +260,47 @@ var file_protos_ensemble_service_proto_rawDesc = []byte{ 0x65, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x1e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x22, - 0x3b, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x65, 0x63, 0x72, 0x65, 0x74, 0x22, 0xcc, 0x01, 0x0a, - 0x0e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x51, 0x0a, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x39, 0x2e, 0x63, 0x6f, 0x6e, 0x76, + 0x27, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x77, 0x0a, 0x0d, 0x41, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6d, 0x65, 0x6d, + 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x6d, 0x62, 0x65, + 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x6c, 0x67, 0x6f, 0x72, 0x69, 0x74, 0x68, 0x6d, 0x12, + 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, + 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x4b, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x33, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, + 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, + 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4d, 0x0a, 0x0a, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, + 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, + 0x01, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, + 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x58, 0x49, 0x53, + 0x54, 0x53, 0x10, 0x04, 0x32, 0xe6, 0x01, 0x0a, 0x10, 0x45, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, + 0x65, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x68, 0x0a, 0x0d, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, + 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, + 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, - 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x4d, 0x0a, 0x0a, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, - 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, - 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, - 0x52, 0x10, 0x02, 0x12, 0x0a, 0x0a, 0x06, 0x44, 0x45, 0x4e, 0x49, 0x45, 0x44, 0x10, 0x03, 0x12, - 0x0a, 0x0a, 0x06, 0x45, 0x58, 0x49, 0x53, 0x54, 0x53, 0x10, 0x04, 0x32, 0x82, 0x01, 0x0a, 0x10, - 0x45, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, - 0x12, 0x6e, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, - 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, - 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x2e, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, 0x6f, 0x6d, 0x70, - 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, - 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, - 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, - 0x6e, 0x67, 0x2f, 0x65, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x2d, 0x6f, 0x70, 0x65, 0x72, - 0x61, 0x74, 0x6f, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x68, 0x0a, 0x0d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2d, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, + 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x63, 0x6f, 0x6e, 0x76, 0x65, 0x72, 0x67, 0x65, 0x64, 0x63, + 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x67, 0x72, 0x70, + 0x63, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x39, 0x5a, + 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x6f, 0x6e, 0x76, + 0x65, 0x72, 0x67, 0x65, 0x64, 0x2d, 0x63, 0x6f, 0x6d, 0x70, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x2f, + 0x65, 0x6e, 0x73, 0x65, 0x6d, 0x62, 0x6c, 0x65, 0x2d, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, + 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -240,18 +316,21 @@ func file_protos_ensemble_service_proto_rawDescGZIP() []byte { } var file_protos_ensemble_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_protos_ensemble_service_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_protos_ensemble_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_protos_ensemble_service_proto_goTypes = []interface{}{ - (StatusResponse_ResultType)(0), // 0: convergedcomputing.org.grpc.v1.StatusResponse.ResultType - (*StatusRequest)(nil), // 1: convergedcomputing.org.grpc.v1.StatusRequest - (*StatusResponse)(nil), // 2: convergedcomputing.org.grpc.v1.StatusResponse + (Response_ResultType)(0), // 0: convergedcomputing.org.grpc.v1.Response.ResultType + (*StatusRequest)(nil), // 1: convergedcomputing.org.grpc.v1.StatusRequest + (*ActionRequest)(nil), // 2: convergedcomputing.org.grpc.v1.ActionRequest + (*Response)(nil), // 3: convergedcomputing.org.grpc.v1.Response } var file_protos_ensemble_service_proto_depIdxs = []int32{ - 0, // 0: convergedcomputing.org.grpc.v1.StatusResponse.status:type_name -> convergedcomputing.org.grpc.v1.StatusResponse.ResultType + 0, // 0: convergedcomputing.org.grpc.v1.Response.status:type_name -> convergedcomputing.org.grpc.v1.Response.ResultType 1, // 1: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestStatus:input_type -> convergedcomputing.org.grpc.v1.StatusRequest - 2, // 2: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestStatus:output_type -> convergedcomputing.org.grpc.v1.StatusResponse - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type + 2, // 2: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestAction:input_type -> convergedcomputing.org.grpc.v1.ActionRequest + 3, // 3: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestStatus:output_type -> convergedcomputing.org.grpc.v1.Response + 3, // 4: convergedcomputing.org.grpc.v1.EnsembleOperator.RequestAction:output_type -> convergedcomputing.org.grpc.v1.Response + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type 1, // [1:1] is the sub-list for extension type_name 1, // [1:1] is the sub-list for extension extendee 0, // [0:1] is the sub-list for field type_name @@ -276,7 +355,19 @@ func file_protos_ensemble_service_proto_init() { } } file_protos_ensemble_service_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*StatusResponse); i { + switch v := v.(*ActionRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_ensemble_service_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Response); i { case 0: return &v.state case 1: @@ -294,7 +385,7 @@ func file_protos_ensemble_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_protos_ensemble_service_proto_rawDesc, NumEnums: 1, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, diff --git a/protos/ensemble-service.proto b/protos/ensemble-service.proto index 3fdbea3..9f0a191 100644 --- a/protos/ensemble-service.proto +++ b/protos/ensemble-service.proto @@ -4,17 +4,28 @@ package convergedcomputing.org.grpc.v1; option go_package = "github.com/converged-computing/ensemble-operator/protos"; service EnsembleOperator { - rpc RequestStatus(StatusRequest) returns (StatusResponse); + rpc RequestStatus(StatusRequest) returns (Response); + rpc RequestAction(ActionRequest) returns (Response); } // StatusRequest asks to see the status of the queue and jobs // TODO add auth here... message StatusRequest { - string name = 1; - string secret = 2; + + // This is the ensemble member type (e.g., minicluster) + string member = 1; +} + +// ActionRequest requests an action +message ActionRequest { + string member = 1; + string algorithm = 2; + string action = 3; + string payload = 4; } - message StatusResponse { + + message Response { // Registration statuses enum ResultType { diff --git a/protos/ensemble-service_grpc.pb.go b/protos/ensemble-service_grpc.pb.go index 71d1e4d..7d819ac 100644 --- a/protos/ensemble-service_grpc.pb.go +++ b/protos/ensemble-service_grpc.pb.go @@ -22,7 +22,8 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type EnsembleOperatorClient interface { - RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*Response, error) + RequestAction(ctx context.Context, in *ActionRequest, opts ...grpc.CallOption) (*Response, error) } type ensembleOperatorClient struct { @@ -33,8 +34,8 @@ func NewEnsembleOperatorClient(cc grpc.ClientConnInterface) EnsembleOperatorClie return &ensembleOperatorClient{cc} } -func (c *ensembleOperatorClient) RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) { - out := new(StatusResponse) +func (c *ensembleOperatorClient) RequestStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) err := c.cc.Invoke(ctx, "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestStatus", in, out, opts...) if err != nil { return nil, err @@ -42,11 +43,21 @@ func (c *ensembleOperatorClient) RequestStatus(ctx context.Context, in *StatusRe return out, nil } +func (c *ensembleOperatorClient) RequestAction(ctx context.Context, in *ActionRequest, opts ...grpc.CallOption) (*Response, error) { + out := new(Response) + err := c.cc.Invoke(ctx, "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // EnsembleOperatorServer is the server API for EnsembleOperator service. // All implementations must embed UnimplementedEnsembleOperatorServer // for forward compatibility type EnsembleOperatorServer interface { - RequestStatus(context.Context, *StatusRequest) (*StatusResponse, error) + RequestStatus(context.Context, *StatusRequest) (*Response, error) + RequestAction(context.Context, *ActionRequest) (*Response, error) mustEmbedUnimplementedEnsembleOperatorServer() } @@ -54,9 +65,12 @@ type EnsembleOperatorServer interface { type UnimplementedEnsembleOperatorServer struct { } -func (UnimplementedEnsembleOperatorServer) RequestStatus(context.Context, *StatusRequest) (*StatusResponse, error) { +func (UnimplementedEnsembleOperatorServer) RequestStatus(context.Context, *StatusRequest) (*Response, error) { return nil, status.Errorf(codes.Unimplemented, "method RequestStatus not implemented") } +func (UnimplementedEnsembleOperatorServer) RequestAction(context.Context, *ActionRequest) (*Response, error) { + return nil, status.Errorf(codes.Unimplemented, "method RequestAction not implemented") +} func (UnimplementedEnsembleOperatorServer) mustEmbedUnimplementedEnsembleOperatorServer() {} // UnsafeEnsembleOperatorServer may be embedded to opt out of forward compatibility for this service. @@ -88,6 +102,24 @@ func _EnsembleOperator_RequestStatus_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } +func _EnsembleOperator_RequestAction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ActionRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EnsembleOperatorServer).RequestAction(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EnsembleOperatorServer).RequestAction(ctx, req.(*ActionRequest)) + } + return interceptor(ctx, in, info, handler) +} + // EnsembleOperator_ServiceDesc is the grpc.ServiceDesc for EnsembleOperator service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -99,6 +131,10 @@ var EnsembleOperator_ServiceDesc = grpc.ServiceDesc{ MethodName: "RequestStatus", Handler: _EnsembleOperator_RequestStatus_Handler, }, + { + MethodName: "RequestAction", + Handler: _EnsembleOperator_RequestAction_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "protos/ensemble-service.proto", diff --git a/python/ensemble_operator/algorithm/__init__.py b/python/ensemble_operator/algorithm/__init__.py new file mode 100644 index 0000000..c3cb988 --- /dev/null +++ b/python/ensemble_operator/algorithm/__init__.py @@ -0,0 +1,12 @@ +from .workload_demand import WorkloadDemand + + +def get_algorithm(name, options=None): + """ + Get a named backend. + """ + options = options or {} + name = name.lower() + if name == "workload-demand": + return WorkloadDemand(**options) + raise ValueError(f"Algorithm {name} is not known") diff --git a/python/ensemble_operator/algorithm/base.py b/python/ensemble_operator/algorithm/base.py new file mode 100644 index 0000000..77a37e3 --- /dev/null +++ b/python/ensemble_operator/algorithm/base.py @@ -0,0 +1,21 @@ +# An algorithm is a named class that can receive the payload from +# a StatusRequest, and optionally act on the queue (with the action) +# depending on the algorithm in question. + + +class AlgorithmBase: + """ + The AlgorithmBase is an abstract base to show functions defined. + """ + + def submit(self, *args, **kwargs): + """ + The submit action will submit one or more jobs. + """ + raise NotImplementedError + + def run(self, *args, **kwargs): + """ + Receive the payload from the Ensemble Operator and take action. + """ + raise NotImplementedError diff --git a/python/ensemble_operator/algorithm/workload_demand.py b/python/ensemble_operator/algorithm/workload_demand.py new file mode 100644 index 0000000..7beaddb --- /dev/null +++ b/python/ensemble_operator/algorithm/workload_demand.py @@ -0,0 +1,16 @@ +import json + +from .base import AlgorithmBase + +# The memory database backend provides an interface to interact with an in memory cluster database + + +class WorkloadDemand(AlgorithmBase): + """ + The WorkloadDemand algorithm + + This can include other ML, etc. calculations for determining + how to assist an action, haven't thought through this yet + """ + + pass diff --git a/python/ensemble_operator/members/__init__.py b/python/ensemble_operator/members/__init__.py new file mode 100644 index 0000000..679dc29 --- /dev/null +++ b/python/ensemble_operator/members/__init__.py @@ -0,0 +1,12 @@ +from .minicluster import MiniClusterMember + + +def get_member(name, options=None): + """ + Get a named member type + """ + options = options or {} + name = name.lower() + if name == "minicluster": + return MiniClusterMember(**options) + raise ValueError(f"Member type {name} is not known") diff --git a/python/ensemble_operator/members/base.py b/python/ensemble_operator/members/base.py new file mode 100644 index 0000000..06ee043 --- /dev/null +++ b/python/ensemble_operator/members/base.py @@ -0,0 +1,18 @@ +class MemberBase: + """ + The MemberBase is an abstract base to show functions defined. + """ + + def __init__(self, **options): + """ + Create a new member type (e.g., minicluster) + """ + # Set options as attributes + for key, value in options.items(): + setattr(self, key, value) + + def status(self, *args, **kwargs): + """ + Ask the member for a status + """ + raise NotImplementedError diff --git a/python/ensemble_operator/members/minicluster/__init__.py b/python/ensemble_operator/members/minicluster/__init__.py new file mode 100644 index 0000000..1bc9135 --- /dev/null +++ b/python/ensemble_operator/members/minicluster/__init__.py @@ -0,0 +1,52 @@ +import json +import shlex + +from ensemble_operator.members.base import MemberBase + + +class MiniClusterMember(MemberBase): + """ + The MiniCluster member type + + Asking a MiniCluster member for a status means asking the flux queue. + """ + + def __init__(self): + """ + Create a new minicluster handle + """ + import flux + + self.handle = flux.Flux() + + def status(self, payload): + """ + Ask the flux queue (metrics) for a status + """ + import ensemble_operator.members.minicluster.metrics as metrics + + # Prepare a payload to send back + payload = {} + + # The payload is the metrics listing + for name, func in metrics.metrics.items(): + payload[name] = func() + + return payload + + def submit(self, payload): + """ + Receive the flux handle and StatusRequest payload to act on. + """ + print(payload) + payload = json.loads(payload) + + # Allow this to fail - it will raise a value error that will propogate back to the operator + import flux.job + + for job in payload.get("jobs") or []: + print(job) + command = shlex.split(job["command"]) + jobspec = flux.job.JobspecV1.from_command(command=command, num_nodes=job["nodes"]) + jobid = flux.job.submit(self.handle, jobspec) + print(f"Submit job {command}: {jobid}") diff --git a/python/ensemble_operator/metrics.py b/python/ensemble_operator/members/minicluster/metrics.py similarity index 100% rename from python/ensemble_operator/metrics.py rename to python/ensemble_operator/members/minicluster/metrics.py diff --git a/python/ensemble_operator/protos/ensemble_service_pb2.py b/python/ensemble_operator/protos/ensemble_service_pb2.py index 15c27b3..ade7db9 100644 --- a/python/ensemble_operator/protos/ensemble_service_pb2.py +++ b/python/ensemble_operator/protos/ensemble_service_pb2.py @@ -14,7 +14,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x16\x65nsemble-service.proto\x12\x1e\x63onvergedcomputing.org.grpc.v1"-\n\rStatusRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0e\n\x06secret\x18\x02 \x01(\t"\xbb\x01\n\x0eStatusResponse\x12\x0f\n\x07payload\x18\x01 \x01(\t\x12I\n\x06status\x18\x04 \x01(\x0e\x32\x39.convergedcomputing.org.grpc.v1.StatusResponse.ResultType"M\n\nResultType\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\n\n\x06\x44\x45NIED\x10\x03\x12\n\n\x06\x45XISTS\x10\x04\x32\x82\x01\n\x10\x45nsembleOperator\x12n\n\rRequestStatus\x12-.convergedcomputing.org.grpc.v1.StatusRequest\x1a..convergedcomputing.org.grpc.v1.StatusResponseB9Z7github.com/converged-computing/ensemble-operator/protosb\x06proto3' + b'\n\x16\x65nsemble-service.proto\x12\x1e\x63onvergedcomputing.org.grpc.v1"\x1f\n\rStatusRequest\x12\x0e\n\x06member\x18\x01 \x01(\t"S\n\rActionRequest\x12\x0e\n\x06member\x18\x01 \x01(\t\x12\x11\n\talgorithm\x18\x02 \x01(\t\x12\x0e\n\x06\x61\x63tion\x18\x03 \x01(\t\x12\x0f\n\x07payload\x18\x04 \x01(\t"\xaf\x01\n\x08Response\x12\x0f\n\x07payload\x18\x01 \x01(\t\x12\x43\n\x06status\x18\x04 \x01(\x0e\x32\x33.convergedcomputing.org.grpc.v1.Response.ResultType"M\n\nResultType\x12\x0f\n\x0bUNSPECIFIED\x10\x00\x12\x0b\n\x07SUCCESS\x10\x01\x12\t\n\x05\x45RROR\x10\x02\x12\n\n\x06\x44\x45NIED\x10\x03\x12\n\n\x06\x45XISTS\x10\x04\x32\xe6\x01\n\x10\x45nsembleOperator\x12h\n\rRequestStatus\x12-.convergedcomputing.org.grpc.v1.StatusRequest\x1a(.convergedcomputing.org.grpc.v1.Response\x12h\n\rRequestAction\x12-.convergedcomputing.org.grpc.v1.ActionRequest\x1a(.convergedcomputing.org.grpc.v1.ResponseB9Z7github.com/converged-computing/ensemble-operator/protosb\x06proto3' ) _globals = globals() @@ -26,11 +26,13 @@ "DESCRIPTOR" ]._serialized_options = b"Z7github.com/converged-computing/ensemble-operator/protos" _globals["_STATUSREQUEST"]._serialized_start = 58 - _globals["_STATUSREQUEST"]._serialized_end = 103 - _globals["_STATUSRESPONSE"]._serialized_start = 106 - _globals["_STATUSRESPONSE"]._serialized_end = 293 - _globals["_STATUSRESPONSE_RESULTTYPE"]._serialized_start = 216 - _globals["_STATUSRESPONSE_RESULTTYPE"]._serialized_end = 293 - _globals["_ENSEMBLEOPERATOR"]._serialized_start = 296 - _globals["_ENSEMBLEOPERATOR"]._serialized_end = 426 + _globals["_STATUSREQUEST"]._serialized_end = 89 + _globals["_ACTIONREQUEST"]._serialized_start = 91 + _globals["_ACTIONREQUEST"]._serialized_end = 174 + _globals["_RESPONSE"]._serialized_start = 177 + _globals["_RESPONSE"]._serialized_end = 352 + _globals["_RESPONSE_RESULTTYPE"]._serialized_start = 275 + _globals["_RESPONSE_RESULTTYPE"]._serialized_end = 352 + _globals["_ENSEMBLEOPERATOR"]._serialized_start = 355 + _globals["_ENSEMBLEOPERATOR"]._serialized_end = 585 # @@protoc_insertion_point(module_scope) diff --git a/python/ensemble_operator/protos/ensemble_service_pb2.pyi b/python/ensemble_operator/protos/ensemble_service_pb2.pyi index ada39bb..74cef42 100644 --- a/python/ensemble_operator/protos/ensemble_service_pb2.pyi +++ b/python/ensemble_operator/protos/ensemble_service_pb2.pyi @@ -6,29 +6,39 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor class StatusRequest(_message.Message): - __slots__ = ("name", "secret") - NAME_FIELD_NUMBER: _ClassVar[int] - SECRET_FIELD_NUMBER: _ClassVar[int] - name: str - secret: str - def __init__(self, name: _Optional[str] = ..., secret: _Optional[str] = ...) -> None: ... + __slots__ = ("member",) + MEMBER_FIELD_NUMBER: _ClassVar[int] + member: str + def __init__(self, member: _Optional[str] = ...) -> None: ... -class StatusResponse(_message.Message): +class ActionRequest(_message.Message): + __slots__ = ("member", "algorithm", "action", "payload") + MEMBER_FIELD_NUMBER: _ClassVar[int] + ALGORITHM_FIELD_NUMBER: _ClassVar[int] + ACTION_FIELD_NUMBER: _ClassVar[int] + PAYLOAD_FIELD_NUMBER: _ClassVar[int] + member: str + algorithm: str + action: str + payload: str + def __init__(self, member: _Optional[str] = ..., algorithm: _Optional[str] = ..., action: _Optional[str] = ..., payload: _Optional[str] = ...) -> None: ... + +class Response(_message.Message): __slots__ = ("payload", "status") class ResultType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): __slots__ = () - UNSPECIFIED: _ClassVar[StatusResponse.ResultType] - SUCCESS: _ClassVar[StatusResponse.ResultType] - ERROR: _ClassVar[StatusResponse.ResultType] - DENIED: _ClassVar[StatusResponse.ResultType] - EXISTS: _ClassVar[StatusResponse.ResultType] - UNSPECIFIED: StatusResponse.ResultType - SUCCESS: StatusResponse.ResultType - ERROR: StatusResponse.ResultType - DENIED: StatusResponse.ResultType - EXISTS: StatusResponse.ResultType + UNSPECIFIED: _ClassVar[Response.ResultType] + SUCCESS: _ClassVar[Response.ResultType] + ERROR: _ClassVar[Response.ResultType] + DENIED: _ClassVar[Response.ResultType] + EXISTS: _ClassVar[Response.ResultType] + UNSPECIFIED: Response.ResultType + SUCCESS: Response.ResultType + ERROR: Response.ResultType + DENIED: Response.ResultType + EXISTS: Response.ResultType PAYLOAD_FIELD_NUMBER: _ClassVar[int] STATUS_FIELD_NUMBER: _ClassVar[int] payload: str - status: StatusResponse.ResultType - def __init__(self, payload: _Optional[str] = ..., status: _Optional[_Union[StatusResponse.ResultType, str]] = ...) -> None: ... + status: Response.ResultType + def __init__(self, payload: _Optional[str] = ..., status: _Optional[_Union[Response.ResultType, str]] = ...) -> None: ... diff --git a/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py b/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py index 781338f..b618604 100644 --- a/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py +++ b/python/ensemble_operator/protos/ensemble_service_pb2_grpc.py @@ -17,7 +17,12 @@ def __init__(self, channel): self.RequestStatus = channel.unary_unary( "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestStatus", request_serializer=ensemble__service__pb2.StatusRequest.SerializeToString, - response_deserializer=ensemble__service__pb2.StatusResponse.FromString, + response_deserializer=ensemble__service__pb2.Response.FromString, + ) + self.RequestAction = channel.unary_unary( + "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", + request_serializer=ensemble__service__pb2.ActionRequest.SerializeToString, + response_deserializer=ensemble__service__pb2.Response.FromString, ) @@ -30,13 +35,24 @@ def RequestStatus(self, request, context): context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def RequestAction(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def add_EnsembleOperatorServicer_to_server(servicer, server): rpc_method_handlers = { "RequestStatus": grpc.unary_unary_rpc_method_handler( servicer.RequestStatus, request_deserializer=ensemble__service__pb2.StatusRequest.FromString, - response_serializer=ensemble__service__pb2.StatusResponse.SerializeToString, + response_serializer=ensemble__service__pb2.Response.SerializeToString, + ), + "RequestAction": grpc.unary_unary_rpc_method_handler( + servicer.RequestAction, + request_deserializer=ensemble__service__pb2.ActionRequest.FromString, + response_serializer=ensemble__service__pb2.Response.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -67,7 +83,36 @@ def RequestStatus( target, "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestStatus", ensemble__service__pb2.StatusRequest.SerializeToString, - ensemble__service__pb2.StatusResponse.FromString, + ensemble__service__pb2.Response.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def RequestAction( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/convergedcomputing.org.grpc.v1.EnsembleOperator/RequestAction", + ensemble__service__pb2.ActionRequest.SerializeToString, + ensemble__service__pb2.Response.FromString, options, channel_credentials, insecure, diff --git a/python/ensemble_operator/server.py b/python/ensemble_operator/server.py index 2945655..4c3a2a6 100644 --- a/python/ensemble_operator/server.py +++ b/python/ensemble_operator/server.py @@ -6,10 +6,17 @@ import grpc +import ensemble_operator.algorithm as algorithms import ensemble_operator.defaults as defaults +import ensemble_operator.members as members from ensemble_operator.protos import ensemble_service_pb2 from ensemble_operator.protos import ensemble_service_pb2_grpc as api +# IMPORTANT: this only works with global variables if we have one ensemble gRPC per pod. +# We are anticipating storing some state here with the ensemble mamber since the +# operator should not be doing that. +cache = {} + def get_parser(): parser = argparse.ArgumentParser( @@ -50,33 +57,51 @@ class EnsembleEndpoint(api.EnsembleOperatorServicer): """ def RequestStatus(self, request, context): - """Request information about queues and jobs.""" - print(request) + """ + Request information about queues and jobs. + """ print(context) + print(f"Member type: {request.member}") + + # This will raise an error if the member type (e.g., minicluster) is not known + member = members.get_member(request.member) # If the flux handle didn't work, this might error try: - import ensemble_operator.metrics as metrics + payload = member.status() except: - return ensemble_service_pb2.StatusResponse( - status=ensemble_service_pb2.StatusResponse.ResultType.ERROR + return ensemble_service_pb2.Response( + status=ensemble_service_pb2.Response.ResultType.ERROR ) - # Prepare a payload to send back - payload = {} - - # The payload is the metrics listing - for name, func in metrics.metrics.items(): - payload[name] = func() - - print(json.dumps(payload, indent=4)) - # context.set_code(grpc.StatusCode.UNIMPLEMENTED) - # context.set_details('Method not implemented!') - return ensemble_service_pb2.StatusResponse( + print(json.dumps(payload)) + return ensemble_service_pb2.Response( payload=json.dumps(payload), - status=ensemble_service_pb2.StatusResponse.ResultType.SUCCESS, + status=ensemble_service_pb2.Response.ResultType.SUCCESS, ) + def RequestAction(self, request, context): + """ + Request an action is performed according to an algorithm. + """ + print(f"Algorithm {request.algorithm}") + print(f"Action {request.action}") + print(f"Payload {request.payload}") + + # Assume successful response + status = ensemble_service_pb2.Response.ResultType.SUCCESS + + # TODO retrieve the algorithm to process the request + # We aren't using this or doing that yet, we are just submitting jobs + alg = algorithms.get_algorithm(request.algorithm) + if request.action == "submit": + try: + alg.submit(request.payload) + except Exception as e: + status = ensemble_service_pb2.Response.ResultType.ERROR + + return ensemble_service_pb2.Response(status=status) + def serve(port, workers): """ diff --git a/python/script/build-docker.sh b/python/script/build-docker.sh new file mode 100755 index 0000000..ff2e440 --- /dev/null +++ b/python/script/build-docker.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +# Helper script to build docker images + +docker build -t ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test -f docker/Dockerfile.rocky . +docker push ghcr.io/converged-computing/ensemble-operator-api:rockylinux9-test diff --git a/python/setup.cfg b/python/setup.cfg index cd9f494..d59994d 100644 --- a/python/setup.cfg +++ b/python/setup.cfg @@ -3,4 +3,4 @@ exclude = docs max-line-length = 100 ignore = E1 E2 E5 W5 per-file-ignores = - rainbow/__init__.py:F401 + ensemble_operator/__init__.py:F401