diff --git a/python/.pre-commit-config.yaml b/.pre-commit-config.yaml similarity index 100% rename from python/.pre-commit-config.yaml rename to .pre-commit-config.yaml diff --git a/README.md b/README.md index b3a396b..69a1469 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ This operator will deploy ensembles of HPC applications, first with just Flux Fr **under development** -See [docs](docs) for early documentation. We currently have the GRPC service endpoint and client (in the operator) working, and a regular check for the flux queue status, and just need to implement algorithms now that make sense. A design is shown below. +See [docs](docs) for early documentation and [algorithms](https://github.com/converged-computing/ensemble-operator/blob/main/docs/algorithms.md#algorithms) for our planned work in that space. We currently have the GRPC service endpoint and client (in the operator) working, and a regular check for the flux queue status, and just need to implement algorithms now that make sense. A design is shown below. ## Design diff --git a/docs/README.md b/docs/README.md index eb2e8d7..b99cf6d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,6 +1,6 @@ # Ensemble Operator -The ensemble operator is intended to run ensembles of workloads, and change them according to a user-specified algorithm. +The ensemble operator is intended to run ensembles of workloads, and change them according to a user-specified [algorithm](algorithms.md). Since an entity in an ensemble is typically more complex than a container, we allow creation of a few set of notable Kubernetes abstractions: @@ -15,141 +15,10 @@ to use them. Thus, the default abstraction that will be created is Job, and for - change in size (e.g., a single Flux Operator Minicluster increasing or decreasing in size) - scale (e.g., deploying more than one instance of a Job) -Details TBA, still in my head! +Many details are TBA (still in my head) but you can read the following: -## Getting Started +## Documentatino -### 1. Create Cluster - -Let's create a kind cluster first. - -```bash -kind create cluster --config ./examples/kind-config.yaml -``` - -Ensure that the Flux Operator is installed. - -```bash -kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml -``` - -And the ensemble operator - -```bash -kubectl apply -f examples/dist/ensemble-operator.yaml -``` - -### 2. Run LAMMPS - -And then try the simple example to run lammps. - -```bash -kubectl apply -f examples/tests/lammps/ensemble.yaml -``` - -This will create the MiniCluster, per the sizes you specified for it! - -```bash -$ kubectl get pods -``` -```console -NAME READY STATUS RESTARTS AGE -ensemble-sample-0-0-kc6qn 0/1 Init:0/1 0 3s -ensemble-sample-0-1-jjm4p 0/1 Init:0/1 0 3s -``` - -You'll first see init containers (above) that are preparing the flux install. When the containers are running, -you'll then see two containers: - -```console -NAME READY STATUS RESTARTS AGE -ensemble-sample-0-0-zhg47 2/2 Running 0 44s -ensemble-sample-0-1-6dpgm 2/2 Running 0 44s -``` - -### 3. Check GRPC Service Endpoint - -We have two things that are working together: - -- The *GRPC service endpoint* is being served by a sidecar container alongside the MiniCluster -- The *GRPC client* is created by the Ensemble operator by way of looking up the pod ip address - -TLDR: the operator can look at the status of the ensemble queue because a grpc service pod is running alongside the MiniCluster, and providing an endpoint that has direct access to the queue there! We can then implement and choose some algorithm to decide how to scale or terminate the ensemble. -Let's now check that this started correctly - "api" is the name of the container running the sidecar GRPC service: - -```bash -kubectl logs ensemble-sample-0-0-zhg47 -c api -f -``` -```console -[notice] A new release of pip is available: 23.2.1 -> 24.0 -[notice] To update, run: pip3 install --upgrade pip -đŸĨžī¸ Starting ensemble endpoint at :50051 -``` - -We can also check the GRPC endpoint from the operator - depending on when you check, you'll see the payload delivered! - -```bash -kubectl logs -n ensemble-operator-system ensemble-operator-controller-manager-5f874bb7d8-2sbcp -f -``` -```console -2024/03/23 01:43:55 đŸĨžī¸ starting client (10.244.3.23:50051)... -&{10.244.3.23:50051 0xc000077800 0xc0006ae2f0} -payload:"{\"nodes\": {\"node_cores_free\": 18, \"node_cores_up\": 20, \"node_up_count\": 2, \"node_free_count\": 2}, \"queue\": {\"RUN\": 1, \"new\": 0, \"depend\": 0, \"priority\": 0, \"sched\": 0, \"run\": 0, \"cleanup\": 0, \"inactive\": 0}}" status:SUCCESS -SUCCESS -{"nodes": {"node_cores_free": 18, "node_cores_up": 20, "node_up_count": 2, "node_free_count": 2}, "queue": {"RUN": 1, "new": 0, "depend": 0, "priority": 0, "sched": 0, "run": 0, "cleanup": 0, "inactive": 0}} -2024-03-23T01:43:55Z INFO đŸĨžī¸ Ensemble is Ready! {"controller": "ensemble", "controllerGroup": "ensemble.flux-framework.org", "controllerKind": "Ensemble", "Ensemble": {"name":"ensemble-sample","namespace":"default"}, "namespace": "default", "name": "ensemble-sample", "reconcileID": "8ca7973f-17f3-478c-a15b-7d125ca646cd"} -``` - -That output is not parsed (so not pretty yet) but it will be! An Algorithm interface (TBA) will accept that state, and then decide on an action to take. Keep reading the Developer sections below for the high level actions we might do. -And you can see the pings in the client to. They will be at the frequency you specified for your Ensemble CheckSeconds (defaults to 10) - -```bash -kubectl logs ensemble-sample-0-0-dwr2h -c api -f -``` -```console -[notice] A new release of pip is available: 23.2.1 -> 24.0 -[notice] To update, run: pip3 install --upgrade pip -đŸĨžī¸ Starting ensemble endpoint at :50051 - - -{ - "nodes": { - "node_cores_free": 10, - "node_cores_up": 10, - "node_up_count": 1, - "node_free_count": 1 - }, - "queue": { - "new": 0, - "depend": 0, - "priority": 0, - "sched": 0, - "run": 0, - "cleanup": 0, - "inactive": 0 - } -} -``` - -In practice this means we are putting more burden on our operator to keep reconciling when it might finish and stop. But also for this use case of running HPC jobs, I think it's more likely to have a smaller number of ensembles running vs. hundreds of thousands of them. Anyway, scaling an operator is another problem we don't need to worry about now. It's just something to keep in mind. - -## Developer - -Next I will: - -- develop the algorithms for the user to choose from -- make a cute logo :) - -### Algorithms and Actions needed... - -Each reconcile will make a request to the queue and ask for updated information. -It will be on the endpoint (where flux is running) to store any state. Then the algorithn -selected by the user (run by the operator) must define conditions for: - -- when to stop a MiniCluster (e.g., when is it done?) -- when to scale up -- when to scale down -- should there be an ability to ask for more jobs? -- Note that the _cluster_ autoscaler has a concept of [expanders](https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler/expander) that can be tied to request nodes for specific pools. The more advanced setup of this operator will also have a cluster autoscaler. - -Then test it out! We will want different kinds of scaling, both inside and outside. I think I know what I'm going to do and just need to implement it. + - [User Guide](user-guide.md) + - [Algorithms](algorithms.md) + - [Developer](developer.md) diff --git a/docs/algorithms.md b/docs/algorithms.md new file mode 100644 index 0000000..19ad3ab --- /dev/null +++ b/docs/algorithms.md @@ -0,0 +1,151 @@ +# Algorithm Design + +The Ensemble Operator relies on a user-selected algorithm to decide on how to scale the queue, terminate a member, or even issue actions to a member (more advanced use cases). We will implement this design in three phases: + +1. Reactive: the algorithm requests state, possibly updates that state to send back, and takes an action. +2. Controller: the algorithm requests state, updates state to send back, but also might provide an action for the queue to take. +3. Advanced: there is a machine learning model inside of the server that is trying to optimize some condition, and when a request for state is done, it calculates the desired action to send to the operator. The operator then assesses the current state it sees and decides to take the action (or not) sending this decision back to the sidecar. + +We are going to start with the simplest design, and the details are discussed here. + +## State + +While operators are not intended to hold state (it is supposed to be represented in the cluster) I realized that there isn't any reason we cannot store some cache of state in the server running in the member sidecar directly. For example: + +- An algorithm might have a rule that if the maximum jobs are running and the size of the queue has not dropped after N checks, that is an indication to scale the cluster by the size of the next smallest job. We might not have (directly in the operator) this state, but it can be sent over (on demand) for a check we are on, and then given a scaling decision, a response is sent back to indicate any action that the queue (and flux) should take. +- If we have a streaming ML server running in the sidecar, it might want to store a last state of a model to assess some change. +- I often refer to starting up a queue of jobs that aren't running (set priority to 0) and I think we should call this a queue in a frozen state. đŸĨļī¸ + +High level, we introduce the idea of state as a communication mechanism between the member (that holds state) and the operator (which does not) that must still make decisions that often require state. This means we need to define the valid actions for each algorithm, and ensure the logic is right so that the ensemble operator can make decisions relying on the member to hold its state. + +## Objectives + +I think we can simply say that any algorithm should be designed to meet some objective. For example: + +- In many cases achieving the shortest time is likely what we want. +- We might also want to minimize cost +- A metric of goodness or stat derived from a model running in the sidecar gRPC service (e.g., stop when we have this accuracy, or this many successful simulations) + +We have immense freedom in terms of what we can optimize, which is why designing these is pretty fun. + +## Algorithms + +Note that these are off the top of my head, and can be extended (or shrunk). Each can (and likely will be) extended with options, but I need to implement the core structure first. + +### Structure + +An algorithm choice is the selection (by the user) of a set of actions or rules for each of the following: + +- scale up/down +- terminate +- change check frequency +- request an action by the cluster (advanced) + +In the operator code, these actions (or rules) will map to different interfaces that can be selected programmatically (meaning in the CRD as a set of rules for a member). We will likely choose a reasonable default that looks something like "scale and terminate according to the workload needs." For the terminate rule, note that we will have a setting for an ensemble that says "Do not terminate, but exist at the smallest size so I can request more work from you quickly." This will actually be something we need to test - the con is that it is still taking up resources. The pro is that the cluster might have some state worth saving. For simplicity, we will start with the first two cases, and design simple algorithms that decide when to scale up and down. + +### Algorithms + +Each algorithm can define rules for scaling up, down, and termination, and (most) have a paired coordinated function on the server (e.g., to maintain a model or return state). Note that some scheduling algorithms are embedded, for example: + +- If you want priority scheduling, Flux [should support](https://flux-framework.readthedocs.io/projects/flux-rfc/en/latest/spec_30.html#implementation) this. You can assume if the queue is first in first out, the jobs you put in first have highest priority. +- I don't see cases where we'd want to cut or (pre-empt?) jobs. If we use cloud resources to run something, it should be the case we see jobs through completion. Thus, round robin scheduling and shortest time reminaing don't make sense to me in this context. +- Advanced use cases might include multiple queues running within the operator, but I'm not ready to think about that yet. +- High level observation - many of these algorithms require fine grained control in how the jobs are submit (or not, just queued) so that needs to be easy, or minimally, examples provided for each. + +Algorithms are organized by the following: + + - đŸŸĻī¸ **basic** is typically a more static approach that responds retroactively to a queue + - 🚗ī¸ **model** is a more advanced algorithm that still responds to the queue, but relies on advanced logic running inside the ensemble member + - 🕹ī¸ **control** allows the operator to give higher level cluster feedback to the model or queue to better inform a choice, or simply takes this state into account (e.g., think fair-share across ensemble members or a cluster) + + +#### Workoad Demand (of consistent sizes) + +> đŸŸĻī¸ This algorithm assumes a first come, first serve submission (the queue is populated by the batch job) and the cluster resources are adapted to support the needs of the queue (not implemented yet). + +This rule should be the default because it's kind of what you'd want for an autoscaling approach - you want the cluster resources to match the needs of the queue, where the needs of the queue are reflecting in the jobs in it, and can expand up to some maximum size and reduce down to some minimum size (1). This is first come, first serve approach, meaning that we assume the user has submit a ton of jobs to the queue in batch, and whichever are submit first are going to be run first. + +- **scale up rule**: check the number of jobs waiting in the queue vs. the max size in the cluster. If the number waiting exceeds some threshold over N checks, increase the cluster size to allow for one more job, up to the max size. +- **scale down rule**: check the number of jobs running, waiting in the queue, and max size in the cluster. If the number of jobs waiting hits zero and remains at zero over N checks, decrease the size of the cluster down to the exact number needed that are running. +- **terminate rule**: check the number of jobs running and waiting. If this value remains 0 over N checks, the work is done and clean up. If there is a parameter set to keep the minicluster running at minimum operation, scale down to 1 node. + +This algorithm will use the identifier: + +- workload-demand + +#### Random selection + +> đŸŸĻī¸ This algorithm chooses jobs to run at random, and the queue retroactively responds. + +This rule is intended to represent the outcome when jobs are selected to run at random, and the cluster adapts in size to meet that need. +This means that the user submits jobs that are not intended to run (they are set with a priority 0, I think that should work) and an algorithm running inside the ensemble member selects a random N to run each time around. + +- **scale up rule**: check the number of jobs waiting in the queue vs. the max size in the cluster. If the number waiting exceeds some threshold over N checks, increase the cluster size to allow for one more job, up to the max size. +- **scale down rule**: check the number of jobs running, waiting in the queue, and max size in the cluster. If the number of jobs waiting hits zero and remains at zero over N checks, decrease the size of the cluster down to the exact number needed that are running. +- **terminate rule**: check the number of jobs running and waiting. If this value remains 0 over N checks, the work is done and clean up. If there is a parameter set to keep the minicluster running at minimum operation, scale down to 1 node. + +This algorithm will use the identifier: + +- random-selection + + +#### Workload Success + +> 🕹ī¸ Continue running and submitting jobs until a number of successful or valid is reached. + +This algorithm assumes that we are running jobs (e.g., simulations) and many will fail, and we want to continue until we have reached a threshold of success. This likely assumes jobs of the same type and size, but doesn't necessarily have to be. Since we don't know in advance how many jobs we need, we start the queue in a frozen state, and allow jobs to be selected as we go. + +- **scale up rule**: check the number of jobs running, add resources (and request more submit) up to some maximum cluster size +- **scale down rule**: only scale down when we are at or equal to the number needed to be successful or valid. This is essentially allowing the running jobs to complete. +- **terminate rule**: terminate when we have the number of required or valid successful jobs and currently running jobs are complete. + +This algorithm will use the identifier: + +- workload-success + + +#### Select Fastest Runtime First + +> 🚗ī¸ A model based algorithm that selects work based on building a model of runtimes (not implemented yet) + +This algorithm could be used for either workloads of different types (and different sizes) or a single workload that varies across sizes. The user submitting the job creates categories at the level that makes sense to build models for (workload type or size or maybe even both). +The Minicluster starts, but does not submit jobs. Instead, it adds them to the queue in some pending/ waiting state for a signal to start. + +1. The jobs should be labeled in some manner, so it's clear they belong to a category (e.g., LAMMPS or AMG). A separate model will be built for each. +2. At initial start, there is no information about times, so one of each category is randomly selected. +3. The status keeps track of the number of completed for each type (e.g., LAMMPS 10, AMG 12) +4. At each update, the active queue is checked for which workloads are running. + - The internal models are first updated. Jobs that are newly completed (and not known to the model) are used for training + - If we have reached a minimum number of valid workflows to predict time for each, we use the model to select the next fastest to complete workload. + - If we have not reached that value, we continue submitting "fairly" - at an equal distributed between each. + +- **scale up rule**: when a selected workload (based on runtime) does not have enough resources, we scale up to the size that is needed to accommodate it. +- **scale down rule**: when a selected workload (based on runtime) has too many resources, we scale down. +- **terminate rule** when the queue is empty after N checks. + +The above might be modified by adding another variable for a number of workloads allowed to run at once. E.g., we always select the max allowed to run - the current in the queue to select some new number of jobs, +and then choose the two with the fastest run time, and scale up or down accordingly. + +This algorithm will use the identifier: + +- select-fastest-runtime + +This approach could arguably be improved by taking in a pre-existing model at the loading time, so we start all ready to go and don't need to wait to build the model. I am planning on using River for a streamling ML approach. + + +#### Select Longest Runtime First + +> 🚗ī¸ Model based algorithm to select work based + +This algorithm is the same as the above, except we select for the longest runtimes first. In practice I'm not sure the use cases this might address, but the implication is that the longest runtimes are somehow more important. + +- select-longest-runtime + + +#### Cost Based Selection + +> 🕹ī¸ Select an ensemble member to schedule to based on cost. + +We know that folks aren't great at [connecting resource usage to costs](https://www.infoq.com/news/2024/03/cncf-finops-kubernetes-overspend/). +This setup would require multiple ensemble members, each associated with a node pool (that has a different instance type). For this kind of scaling, we have a goal to run a set of jobs across ensemble members, and the Ensemble Operator will want to trigger a job run (and scaling up or down) of resources based on cost estimates. For example, if a workload can run more quickly on a GPU resource (despite being more expensive) we would add a node to it, and likely scale down other ensemble members not being used. This approach is advanced and would require a model to be maintained across ensemble members. Likely we would have another CRD in the namespace for an `EnsembleModel` that keeps this metadata, and would be updated at each reconcile when a decision is warranted. This would also require starting up with a set of instance types to select from and costs - an added complexity that (while not impossible to provide) would make it harder to use the operator +overall. diff --git a/docs/developer.md b/docs/developer.md new file mode 100644 index 0000000..1beecff --- /dev/null +++ b/docs/developer.md @@ -0,0 +1,20 @@ +# Developer + +Next I will: + +- develop the algorithms for the user to choose from +- make a cute logo :) + +### Algorithms and Actions needed... + +Each reconcile will make a request to the queue and ask for updated information. +It will be on the endpoint (where flux is running) to store any state. Then the algorithn +selected by the user (run by the operator) must define conditions for: + +- when to stop a MiniCluster (e.g., when is it done?) +- when to scale up +- when to scale down +- should there be an ability to ask for more jobs? +- Note that the _cluster_ autoscaler has a concept of [expanders](https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler/expander) that can be tied to request nodes for specific pools. The more advanced setup of this operator will also have a cluster autoscaler. + +Then test it out! We will want different kinds of scaling, both inside and outside. I think I know what I'm going to do and just need to implement it. diff --git a/docs/user-guide.md b/docs/user-guide.md new file mode 100644 index 0000000..0935756 --- /dev/null +++ b/docs/user-guide.md @@ -0,0 +1,117 @@ +# User Guide + +## Getting Started + +### 1. Create Cluster + +Let's create a kind cluster first. + +```bash +kind create cluster --config ./examples/kind-config.yaml +``` + +Ensure that the Flux Operator is installed. + +```bash +kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml +``` + +And the ensemble operator + +```bash +kubectl apply -f examples/dist/ensemble-operator.yaml +``` + +### 2. Run LAMMPS + +And then try the simple example to run lammps. + +```bash +kubectl apply -f examples/tests/lammps/ensemble.yaml +``` + +This will create the MiniCluster, per the sizes you specified for it! + +```bash +$ kubectl get pods +``` +```console +NAME READY STATUS RESTARTS AGE +ensemble-sample-0-0-kc6qn 0/1 Init:0/1 0 3s +ensemble-sample-0-1-jjm4p 0/1 Init:0/1 0 3s +``` + +You'll first see init containers (above) that are preparing the flux install. When the containers are running, +you'll then see two containers: + +```console +NAME READY STATUS RESTARTS AGE +ensemble-sample-0-0-zhg47 2/2 Running 0 44s +ensemble-sample-0-1-6dpgm 2/2 Running 0 44s +``` + +### 3. Check GRPC Service Endpoint + +We have two things that are working together: + +- The *GRPC service endpoint* is being served by a sidecar container alongside the MiniCluster +- The *GRPC client* is created by the Ensemble operator by way of looking up the pod ip address + +TLDR: the operator can look at the status of the ensemble queue because a grpc service pod is running alongside the MiniCluster, and providing an endpoint that has direct access to the queue there! We can then implement and choose some algorithm to decide how to scale or terminate the ensemble. +Let's now check that this started correctly - "api" is the name of the container running the sidecar GRPC service: + +```bash +kubectl logs ensemble-sample-0-0-zhg47 -c api -f +``` +```console +[notice] A new release of pip is available: 23.2.1 -> 24.0 +[notice] To update, run: pip3 install --upgrade pip +đŸĨžī¸ Starting ensemble endpoint at :50051 +``` + +We can also check the GRPC endpoint from the operator - depending on when you check, you'll see the payload delivered! + +```bash +kubectl logs -n ensemble-operator-system ensemble-operator-controller-manager-5f874bb7d8-2sbcp -f +``` +```console +2024/03/23 01:43:55 đŸĨžī¸ starting client (10.244.3.23:50051)... +&{10.244.3.23:50051 0xc000077800 0xc0006ae2f0} +payload:"{\"nodes\": {\"node_cores_free\": 18, \"node_cores_up\": 20, \"node_up_count\": 2, \"node_free_count\": 2}, \"queue\": {\"RUN\": 1, \"new\": 0, \"depend\": 0, \"priority\": 0, \"sched\": 0, \"run\": 0, \"cleanup\": 0, \"inactive\": 0}}" status:SUCCESS +SUCCESS +{"nodes": {"node_cores_free": 18, "node_cores_up": 20, "node_up_count": 2, "node_free_count": 2}, "queue": {"RUN": 1, "new": 0, "depend": 0, "priority": 0, "sched": 0, "run": 0, "cleanup": 0, "inactive": 0}} +2024-03-23T01:43:55Z INFO đŸĨžī¸ Ensemble is Ready! {"controller": "ensemble", "controllerGroup": "ensemble.flux-framework.org", "controllerKind": "Ensemble", "Ensemble": {"name":"ensemble-sample","namespace":"default"}, "namespace": "default", "name": "ensemble-sample", "reconcileID": "8ca7973f-17f3-478c-a15b-7d125ca646cd"} +``` + +That output is not parsed (so not pretty yet) but it will be! An Algorithm interface (TBA) will accept that state, and then decide on an action to take. Keep reading the Developer sections below for the high level actions we might do. +And you can see the pings in the client to. They will be at the frequency you specified for your Ensemble CheckSeconds (defaults to 10) + +```bash +kubectl logs ensemble-sample-0-0-dwr2h -c api -f +``` +```console +[notice] A new release of pip is available: 23.2.1 -> 24.0 +[notice] To update, run: pip3 install --upgrade pip +đŸĨžī¸ Starting ensemble endpoint at :50051 + + +{ + "nodes": { + "node_cores_free": 10, + "node_cores_up": 10, + "node_up_count": 1, + "node_free_count": 1 + }, + "queue": { + "new": 0, + "depend": 0, + "priority": 0, + "sched": 0, + "run": 0, + "cleanup": 0, + "inactive": 0 + } +} +``` + +In practice this means we are putting more burden on our operator to keep reconciling when it might finish and stop. But also for this use case of running HPC jobs, I think it's more likely to have a smaller number of ensembles running vs. hundreds of thousands of them. Anyway, scaling an operator is another problem we don't need to worry about now. It's just something to keep in mind.