diff --git a/README.md b/README.md index 515d313..300eb1d 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ The way it works: 1. We have a mutating admission webhook that looks for jobs and pods, and ensures there are fluence labels (likely we will add more abstractions). 2. A PodGroup reconciler is watching for these same objects. When they are created: a. We find the labels and create the pod group object. - b. The pod group object has a timestamp for creation in milliseconds. + b. The pod group object has a timestamp for creation in microseconds. 3. When the pod is then given to fluence for scheduling, it already has the PodGroup created with name/size and can properly sort. Here is an example of a Job intended for Fluence: @@ -452,7 +452,7 @@ If you are looking to develop: - [src](src): includes source code for fluence. You'll find logs for this code in the `sidecar` container of the fluence pod. - [sig-scheduler-plugins](sig-scheduler-plugins): includes assets (manifests and Go files) that are intended to be added to the kubernetes-sigs/scheduler-plugins upstream repository before build. You'll find logs for this container in the `scheduler-plugins-scheduler` container of the pod. - - [apis](sig-scheduler-plugins/apis): customized PodGroup to define the status scheduled time in micro seconds + - [apis](sig-scheduler-plugins/apis): customized PodGroup to define the status scheduled time in microseconds - [manifests](sig-scheduler-plugins/manifests): manifests for helm and Kubernetes - [pkg](sig-scheduler-plugins/pkg): the main fluence module to add to upstream - [cmd](sig-scheduler-plugins/cmd): the main.go to replace in upstream diff --git a/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go b/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go index c2582f9..7266d85 100644 --- a/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go +++ b/sig-scheduler-plugins/apis/scheduling/v1alpha1/podgroup_webhook.go @@ -1,5 +1,5 @@ /* -Copyright 2023 Lawrence Livermore National Security, LLC +Copyright 2024 Lawrence Livermore National Security, LLC (c.f. AUTHORS, NOTICE.LLNS, COPYING) SPDX-License-Identifier: MIT @@ -50,14 +50,14 @@ type fluenceWatcher struct { // Handle is the main handler for the webhook, which is looking for jobs and pods (in that order) // If a job comes in (with a pod template) first, we add the labels there first (and they will // not be added again). -func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admission.Response { +func (hook *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admission.Response { logger.Info("Running webhook handle, determining pod wrapper abstraction...") job := &batchv1.Job{} - err := a.decoder.Decode(req, job) + err := hook.decoder.Decode(req, job) if err == nil { - err = a.EnsureGroupOnJob(job) + err = hook.EnsureGroupOnJob(job) if err != nil { logger.Error(err, "Issue adding PodGroup to Job") return admission.Errored(http.StatusBadRequest, err) @@ -72,9 +72,9 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi } pod := &corev1.Pod{} - err = a.decoder.Decode(req, pod) + err = hook.decoder.Decode(req, pod) if err == nil { - err = a.EnsureGroup(pod) + err = hook.EnsureGroup(pod) if err != nil { logger.Error(err, "Issue adding PodGroup to Pod") return admission.Errored(http.StatusBadRequest, err) @@ -89,9 +89,9 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi } set := &appsv1.StatefulSet{} - err = a.decoder.Decode(req, set) + err = hook.decoder.Decode(req, set) if err == nil { - err = a.EnsureGroupStatefulSet(set) + err = hook.EnsureGroupStatefulSet(set) if err != nil { logger.Error(err, "Issue adding PodGroup to StatefulSet") return admission.Errored(http.StatusBadRequest, err) @@ -105,15 +105,15 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi return admission.PatchResponseFromRaw(req.Object.Raw, marshalledSet) } - d := &appsv1.Deployment{} - err = a.decoder.Decode(req, d) + deployment := &appsv1.Deployment{} + err = hook.decoder.Decode(req, deployment) if err == nil { - err = a.EnsureGroupDeployment(d) + err = hook.EnsureGroupDeployment(deployment) if err != nil { logger.Error(err, "Issue adding PodGroup to Deployment") return admission.Errored(http.StatusBadRequest, err) } - marshalledD, err := json.Marshal(d) + marshalledD, err := json.Marshal(deployment) if err != nil { logger.Error(err, "Marshalling Deployment error") return admission.Errored(http.StatusInternalServerError, err) @@ -123,9 +123,9 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi } rset := &appsv1.ReplicaSet{} - err = a.decoder.Decode(req, rset) + err = hook.decoder.Decode(req, rset) if err == nil { - err = a.EnsureGroupReplicaSet(rset) + err = hook.EnsureGroupReplicaSet(rset) if err != nil { logger.Error(err, "Issue adding PodGroup to ReplicaSet") return admission.Errored(http.StatusBadRequest, err) @@ -145,29 +145,28 @@ func (a *fluenceWatcher) Handle(ctx context.Context, req admission.Request) admi } // Default is the expected entrypoint for a webhook... -// I don't remember if this is even called... -func (a *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error { +func (hook *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error { switch obj.(type) { case *batchv1.Job: job := obj.(*batchv1.Job) - return a.EnsureGroupOnJob(job) + return hook.EnsureGroupOnJob(job) case *corev1.Pod: pod := obj.(*corev1.Pod) - return a.EnsureGroup(pod) + return hook.EnsureGroup(pod) case *appsv1.StatefulSet: set := obj.(*appsv1.StatefulSet) - return a.EnsureGroupStatefulSet(set) + return hook.EnsureGroupStatefulSet(set) case *appsv1.Deployment: - d := obj.(*appsv1.Deployment) - return a.EnsureGroupDeployment(d) + deployment := obj.(*appsv1.Deployment) + return hook.EnsureGroupDeployment(deployment) case *appsv1.ReplicaSet: set := obj.(*appsv1.ReplicaSet) - return a.EnsureGroupReplicaSet(set) + return hook.EnsureGroupReplicaSet(set) default: // no match @@ -180,7 +179,7 @@ func (a *fluenceWatcher) Default(ctx context.Context, obj runtime.Object) error // Note that we need to do similar for Job. // A pod without a job wrapper, and without metadata is a group // of size 1. -func (a *fluenceWatcher) EnsureGroup(pod *corev1.Pod) error { +func (hook *fluenceWatcher) EnsureGroup(pod *corev1.Pod) error { // Add labels if we don't have anything. Everything is a group! if pod.Labels == nil { @@ -221,7 +220,7 @@ func getJobLabel(job *batchv1.Job, labelName, defaultLabel string) string { // EnsureGroupOnJob looks for fluence labels (size and name) on both the job // and the pod template. We ultimately put on the pod, the lowest level unit. -// Since we have the size of the job (paramllism) we can use that for the size +// Since we have the size of the job (parallelism) we can use that for the size func (a *fluenceWatcher) EnsureGroupOnJob(job *batchv1.Job) error { // Be forgiving - allow the person to specify it on the job directly or on the Podtemplate @@ -252,7 +251,7 @@ func (a *fluenceWatcher) EnsureGroupOnJob(job *batchv1.Job) error { } // EnsureGroupStatefulSet creates a PodGroup for a StatefulSet -func (a *fluenceWatcher) EnsureGroupStatefulSet(set *appsv1.StatefulSet) error { +func (hook *fluenceWatcher) EnsureGroupStatefulSet(set *appsv1.StatefulSet) error { // StatefulSet requires on top level explicitly if set.Labels == nil { diff --git a/sig-scheduler-plugins/apis/scheduling/v1alpha1/types.go b/sig-scheduler-plugins/apis/scheduling/v1alpha1/types.go index 77f10f3..fca7854 100644 --- a/sig-scheduler-plugins/apis/scheduling/v1alpha1/types.go +++ b/sig-scheduler-plugins/apis/scheduling/v1alpha1/types.go @@ -136,12 +136,12 @@ type PodGroup struct { type PodGroupSpec struct { // MinMember defines the minimal number of members/tasks to run the pod group; // if there's not enough resources to start all tasks, the scheduler - // will not start anyone. + // will not start any. MinMember int32 `json:"minMember,omitempty"` // MinResources defines the minimal resource of members/tasks to run the pod group; // if there's not enough resources to start all tasks, the scheduler - // will not start anyone. + // will not start any. MinResources v1.ResourceList `json:"minResources,omitempty"` // ScheduleTimeoutSeconds defines the maximal time of members/tasks to wait before run the pod group; @@ -169,7 +169,13 @@ type PodGroupStatus struct { // +optional Failed int32 `json:"failed,omitempty"` - // ScheduleStartTime of the group (note that we changed this to a micro time) + // CreationTime is intended to mock the object CreationTime, + // but set by us to be MicroTime instead of Time. + // +optional + CreationTime metav1.MicroTime `json:"creationTime,omitempty"` + + // ScheduleStartTime of the group is when we want to start counting + // "at time N plus 48 hours, this is when we deem time waited is too long" // +optional ScheduleStartTime metav1.MicroTime `json:"scheduleStartTime,omitempty"` } diff --git a/sig-scheduler-plugins/cmd/controller/app/server.go b/sig-scheduler-plugins/cmd/controller/app/server.go index aae8625..c10968e 100644 --- a/sig-scheduler-plugins/cmd/controller/app/server.go +++ b/sig-scheduler-plugins/cmd/controller/app/server.go @@ -65,10 +65,6 @@ func Run(s *ServerRunOptions) error { return err } - // Create a channel for the mutating webhook to communicate back to the reconciler - // This way we create the PodGroup before scheduling - //c := make(chan event.GenericEvent) - if err = (&controllers.PodGroupReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), diff --git a/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/templates/webhook-service.yaml b/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/templates/webhook-service.yaml index bedfb95..e5339a1 100644 --- a/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/templates/webhook-service.yaml +++ b/sig-scheduler-plugins/manifests/install/charts/as-a-second-scheduler/templates/webhook-service.yaml @@ -12,4 +12,4 @@ spec: selector: app: scheduler-plugins-controller ports: - {{- .Values.webhookService.ports | toYaml | nindent 2 -}} + {{- .Values.webhookService.ports | toYaml | nindent 2 -}} diff --git a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go index a2fd4a6..7afb815 100644 --- a/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go +++ b/sig-scheduler-plugins/pkg/controllers/podgroup_controller.go @@ -83,6 +83,7 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c log.Info("REFERENCES", "Reconciler", pg.ObjectMeta.OwnerReferences) // Grab all statuses (and groups of them) we are interested in + // Note that 48 hours seems arbitrary, and if it is, we might make it a variable schedulingOrPending := (pg.Status.Phase == schedv1alpha1.PodGroupScheduling || pg.Status.Phase == schedv1alpha1.PodGroupPending) twoDaysOld := pg.Status.ScheduleStartTime.Sub(pg.CreationTimestamp.Time) > 48*time.Hour finishedOrFailed := pg.Status.Phase == schedv1alpha1.PodGroupFinished || pg.Status.Phase == schedv1alpha1.PodGroupFailed @@ -111,8 +112,11 @@ func (r *PodGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, err } - // If the scheduler time created is Zero (not set) we set it here - if pg.Status.ScheduleStartTime.IsZero() { + // If the pod group creation time created is Zero (not set) we set it here + // This only happens on the first reconcile, which should also be when the + // pod group is created. We set it here and don't use the underlying object + // CreationTime because we need to change the granularity to ms. + if pg.Status.CreationTime.IsZero() { return r.setTimeCreated(ctx, pg, podList.Items, timestamp) } @@ -159,7 +163,7 @@ func (r *PodGroupReconciler) setTimeCreated( // Now patch to update it patch := client.MergeFrom(pg.DeepCopy()) - pg.Status.ScheduleStartTime = timestamp + pg.Status.CreationTime = timestamp // Apply the patch to update the size r.Status().Update(ctx, pg) diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index a74e749..9de5a26 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -78,7 +78,7 @@ type Manager interface { PreFilter(context.Context, *corev1.Pod, *framework.CycleState) error GetPodNode(*corev1.Pod) string GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) - GetCreationTimestamp(*corev1.Pod, time.Time) time.Time + GetCreationTimestamp(*corev1.Pod, time.Time) metav1.MicroTime DeletePermittedPodGroup(string) Permit(context.Context, *framework.CycleState, *corev1.Pod) Status CalculateAssignedPods(string, string) int @@ -255,8 +255,8 @@ func (podGroupManager *PodGroupManager) PreFilter( return nil } - _, exist := podGroupManager.backedOffpodGroup.Get(groupName) - if exist { + _, exists := podGroupManager.backedOffpodGroup.Get(groupName) + if exists { return fmt.Errorf("podGroup %v failed recently", groupName) } @@ -290,8 +290,8 @@ func (podGroupManager *PodGroupManager) PreFilter( // TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group. // It only tries to PreFilter resource constraints so even if a PodGroup passed here, // it may not necessarily pass Filter due to other constraints such as affinity/taints. - _, ok := podGroupManager.permittedpodGroup.Get(groupName) - if ok { + _, exists = podGroupManager.permittedpodGroup.Get(groupName) + if exists { podGroupManager.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", groupName) return nil } @@ -331,17 +331,27 @@ func (podGroupManager *PodGroupManager) PreFilter( return nil } -// GetCreationTimestamp returns the creation time of a podGroup or a pod. -func (podGroupManager *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) time.Time { +// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime) +// The Status.CreationTime is set by the PodGroup reconciler, which has to happen before we have +// a PodGroup. I don't see cases when this wouldn't happen, but in case we fall back to +// converting the pg.CreationTime to a MicroTime +func (podGroupManager *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) metav1.MicroTime { groupName := util.GetPodGroupLabel(pod) if len(groupName) == 0 { - return ts + return metav1.NewMicroTime(ts) } var podGroup v1alpha1.PodGroup if err := podGroupManager.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: groupName}, &podGroup); err != nil { - return ts + return metav1.NewMicroTime(ts) } - return podGroup.CreationTimestamp.Time + // First preference goes to microseconds. This should be set, as it is set by the first + // reconcile, and we wouldn'thave a pod group if it didn't pass through that. + if !podGroup.Status.CreationTime.IsZero() { + return podGroup.Status.CreationTime + } + // Fall back to CreationTime from Kubernetes, in seconds + // In practice this should not happen + return metav1.NewMicroTime(podGroup.CreationTimestamp.Time) } // CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. diff --git a/sig-scheduler-plugins/pkg/fluence/core/flux.go b/sig-scheduler-plugins/pkg/fluence/core/flux.go index 50c8ff1..24c9212 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/flux.go +++ b/sig-scheduler-plugins/pkg/fluence/core/flux.go @@ -69,21 +69,21 @@ func (podGroupManager *PodGroupManager) AskFlux( } // An error here is an error with making the request - r, err := grpcclient.Match(context.Background(), request) + response, err := grpcclient.Match(context.Background(), request) if err != nil { podGroupManager.log.Warning("[PodGroup AskFlux] did not receive any match response: %v\n", err) return nodes, err } // TODO GetPodID should be renamed, because it will reflect the group - podGroupManager.log.Info("[PodGroup AskFlux] Match response ID %s\n", r.GetPodID()) + podGroupManager.log.Info("[PodGroup AskFlux] Match response ID %s\n", response.GetPodID()) // Get the nodelist and inspect - nodelist := r.GetNodelist() + nodelist := response.GetNodelist() for _, node := range nodelist { nodes = append(nodes, node.NodeID) } - jobid := uint64(r.GetJobID()) + jobid := uint64(response.GetJobID()) podGroupManager.log.Info("[PodGroup AskFlux] parsed node pods list %s for job id %d\n", nodes, jobid) // TODO would be nice to actually be able to ask flux jobs -a to fluence @@ -98,10 +98,10 @@ func (podGroupManager *PodGroupManager) AskFlux( // We assume that the cancelled job also means deleting the pod group func (podGroupManager *PodGroupManager) cancelFluxJob(groupName string, pod *corev1.Pod) error { - jobid, ok := podGroupManager.groupToJobId[groupName] + jobid, exists := podGroupManager.groupToJobId[groupName] // The job was already cancelled by another pod - if !ok { + if !exists { podGroupManager.log.Info("[PodGroup cancelFluxJob] Request for cancel of group %s is already complete.", groupName) return nil } @@ -121,15 +121,15 @@ func (podGroupManager *PodGroupManager) cancelFluxJob(groupName string, pod *cor // This error reflects the success or failure of the cancel request request := &pb.CancelRequest{JobID: int64(jobid)} - res, err := grpcclient.Cancel(context.Background(), request) + response, err := grpcclient.Cancel(context.Background(), request) if err != nil { podGroupManager.log.Error("[PodGroup cancelFluxJob] did not receive any cancel response: %v", err) return err } - podGroupManager.log.Info("[PodGroup cancelFluxJob] Job cancellation for group %s result: %d", groupName, res.Error) + podGroupManager.log.Info("[PodGroup cancelFluxJob] Job cancellation for group %s result: %d", groupName, response.Error) // And this error is if the cancel was successful or not - if res.Error == 0 { + if response.Error == 0 { podGroupManager.log.Info("[PodGroup cancelFluxJob] Successful cancel of flux job: %d for group %s", jobid, groupName) podGroupManager.cleanup(pod, groupName) } else { @@ -189,8 +189,8 @@ func (podGroupManager *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { // Do we have the group id in our cache? If yes, we haven't deleted the jobid yet // I am worried here that if some pods are succeeded and others pending, this could // be a mistake - fluence would schedule it again - _, ok := podGroupManager.groupToJobId[groupName] - if ok { + _, exists := podGroupManager.groupToJobId[groupName] + if exists { podGroupManager.cancelFluxJob(groupName, oldPod) } else { podGroupManager.log.Verbose("[PodGroup UpdatePod] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) @@ -204,8 +204,8 @@ func (podGroupManager *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { podGroupManager.mutex.Lock() defer podGroupManager.mutex.Unlock() - _, ok := podGroupManager.groupToJobId[groupName] - if ok { + _, exists := podGroupManager.groupToJobId[groupName] + if exists { podGroupManager.cancelFluxJob(groupName, oldPod) } else { podGroupManager.log.Error("[PodGroup UpdatePod] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) @@ -237,8 +237,8 @@ func (podGroupManager *PodGroupManager) DeletePod(podObj interface{}) { podGroupManager.mutex.Lock() defer podGroupManager.mutex.Unlock() - _, ok := podGroupManager.groupToJobId[groupName] - if ok { + _, exists := podGroupManager.groupToJobId[groupName] + if exists { podGroupManager.cancelFluxJob(groupName, pod) } else { podGroupManager.log.Info("[PodGroup DeletePod] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) @@ -247,8 +247,8 @@ func (podGroupManager *PodGroupManager) DeletePod(podObj interface{}) { podGroupManager.mutex.Lock() defer podGroupManager.mutex.Unlock() - _, ok := podGroupManager.groupToJobId[groupName] - if ok { + _, exists := podGroupManager.groupToJobId[groupName] + if exists { podGroupManager.cancelFluxJob(groupName, pod) } else { podGroupManager.log.Info("[PodGroup DeletePod] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index fddd3f0..44f0349 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -134,14 +134,13 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew return plugin, err } -func (f *Fluence) Name() string { +func (fluence *Fluence) Name() string { return Name } // Fluence has added delete, although I wonder if update includes that signal // and it's redundant? -func (f *Fluence) EventsToRegister() []framework.ClusterEventWithHint { - // TODO I have not redone this yet, not sure what it does (it might replace our informer above) +func (fluence *Fluence) EventsToRegister() []framework.ClusterEventWithHint { // To register a custom event, follow the naming convention at: // https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410 podGroupGVK := fmt.Sprintf("podgroups.v1alpha1.%v", scheduling.GroupName) @@ -152,14 +151,14 @@ func (f *Fluence) EventsToRegister() []framework.ClusterEventWithHint { } // TODO we need to account for affinity here -func (f *Fluence) Filter( +func (fluence *Fluence) Filter( ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo, ) *framework.Status { - f.log.Verbose("[Fluence Filter] Filtering input node %s", nodeInfo.Node().Name) + fluence.log.Verbose("[Fluence Filter] Filtering input node %s", nodeInfo.Node().Name) state, err := cycleState.Read(framework.StateKey(pod.Name)) // No error means we retrieved the state @@ -172,7 +171,7 @@ func (f *Fluence) Filter( if ok && value.NodeName != nodeInfo.Node().Name { return framework.NewStatus(framework.Unschedulable, "pod is not permitted") } else { - f.log.Info("[Fluence Filter] node %s selected for %s\n", value.NodeName, pod.Name) + fluence.log.Info("[Fluence Filter] node %s selected for %s\n", value.NodeName, pod.Name) } } return framework.NewStatus(framework.Success) @@ -182,7 +181,7 @@ func (f *Fluence) Filter( // 1. Compare the priorities of Pods. // 2. Compare the initialization timestamps of PodGroups or Pods. // 3. Compare the keys of PodGroups/Pods: /. -func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { +func (fluence *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { prio1 := corev1helpers.PodPriority(podInfo1.Pod) prio2 := corev1helpers.PodPriority(podInfo2.Pod) if prio1 != prio2 { @@ -193,8 +192,8 @@ func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // which is what fluence needs to distinguish between namespaces. Just the // name could be replicated between different namespaces ctx := context.TODO() - name1, podGroup1 := f.podGroupManager.GetPodGroup(ctx, podInfo1.Pod) - name2, podGroup2 := f.podGroupManager.GetPodGroup(ctx, podInfo2.Pod) + name1, podGroup1 := fluence.podGroupManager.GetPodGroup(ctx, podInfo1.Pod) + name2, podGroup2 := fluence.podGroupManager.GetPodGroup(ctx, podInfo2.Pod) // Fluence can only compare if we have two known groups. // This tries for that first, and falls back to the initial attempt timestamp @@ -212,60 +211,60 @@ func (f *Fluence) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // PreFilterExtensions allow for callbacks on filtered states // This is required to be defined for a PreFilter plugin // https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/interface.go#L383 -func (f *Fluence) PreFilterExtensions() framework.PreFilterExtensions { +func (fluence *Fluence) PreFilterExtensions() framework.PreFilterExtensions { return nil } // PreFilter performs the following validations. // 1. Whether the PodGroup that the Pod belongs to is on the deny list. // 2. Whether the total number of pods in a PodGroup is less than its `minMember`. -func (f *Fluence) PreFilter( +func (fluence *Fluence) PreFilter( ctx context.Context, state *framework.CycleState, pod *corev1.Pod, ) (*framework.PreFilterResult, *framework.Status) { // Quick check if the pod is already scheduled - f.mutex.Lock() - node := f.podGroupManager.GetPodNode(pod) - f.mutex.Unlock() + fluence.mutex.Lock() + node := fluence.podGroupManager.GetPodNode(pod) + fluence.mutex.Unlock() if node != "" { - f.log.Info("[Fluence PreFilter] assigned pod %s to node %s\n", pod.Name, node) + fluence.log.Info("[Fluence PreFilter] assigned pod %s to node %s\n", pod.Name, node) result := framework.PreFilterResult{NodeNames: sets.New(node)} return &result, framework.NewStatus(framework.Success, "") } - f.log.Info("[Fluence PreFilter] pod %s does not have a node assigned\n", pod.Name) + fluence.log.Info("[Fluence PreFilter] pod %s does not have a node assigned\n", pod.Name) // This will populate the node name into the pod group manager - err := f.podGroupManager.PreFilter(ctx, pod, state) + err := fluence.podGroupManager.PreFilter(ctx, pod, state) if err != nil { - f.log.Error("[Fluence PreFilter] failed pod %s: %s", pod.Name, err.Error()) + fluence.log.Error("[Fluence PreFilter] failed pod %s: %s", pod.Name, err.Error()) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } - node = f.podGroupManager.GetPodNode(pod) + node = fluence.podGroupManager.GetPodNode(pod) result := framework.PreFilterResult{NodeNames: sets.New(node)} return &result, framework.NewStatus(framework.Success, "") } // PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. -func (f *Fluence) PostFilter( +func (fluence *Fluence) PostFilter( ctx context.Context, state *framework.CycleState, pod *corev1.Pod, filteredNodeStatusMap framework.NodeToStatusMap, ) (*framework.PostFilterResult, *framework.Status) { - groupName, podGroup := f.podGroupManager.GetPodGroup(ctx, pod) + groupName, podGroup := fluence.podGroupManager.GetPodGroup(ctx, pod) if podGroup == nil { - f.log.Info("Pod does not belong to any group, pod %s", pod.Name) + fluence.log.Info("Pod does not belong to any group, pod %s", pod.Name) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "can not find pod group") } // This explicitly checks nodes, and we can skip scheduling another pod if we already // have the minimum. For fluence since we expect an exact size this likely is not needed - assigned := f.podGroupManager.CalculateAssignedPods(podGroup.Name, pod.Namespace) + assigned := fluence.podGroupManager.CalculateAssignedPods(podGroup.Name, pod.Namespace) if assigned >= int(podGroup.Spec.MinMember) { - f.log.Info("Assigned pods podGroup %s is assigned %s", groupName, assigned) + fluence.log.Info("Assigned pods podGroup %s is assigned %s", groupName, assigned) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) } @@ -273,65 +272,65 @@ func (f *Fluence) PostFilter( // It's based on an implicit assumption: if the nth Pod failed, // it's inferrable other Pods belonging to the same PodGroup would be very likely to fail. - f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + fluence.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == podGroup.Name { - f.log.Info("PostFilter rejects the pod for podGroup %s and pod %s", groupName, waitingPod.GetPod().Name) - waitingPod.Reject(f.Name(), "optimistic rejection in PostFilter") + fluence.log.Info("PostFilter rejects the pod for podGroup %s and pod %s", groupName, waitingPod.GetPod().Name) + waitingPod.Reject(fluence.Name(), "optimistic rejection in PostFilter") } }) - if f.podGroupBackoff != nil { - pods, err := f.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( + if fluence.podGroupBackoff != nil { + pods, err := fluence.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: flabel.GetPodGroupLabel(pod)}), ) if err == nil && len(pods) >= int(podGroup.Spec.MinMember) { - f.podGroupManager.BackoffPodGroup(groupName, *f.podGroupBackoff) + fluence.podGroupManager.BackoffPodGroup(groupName, *fluence.podGroupBackoff) } } - f.podGroupManager.DeletePermittedPodGroup(groupName) + fluence.podGroupManager.DeletePermittedPodGroup(groupName) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", groupName, pod.Name)) } // Permit is the functions invoked by the framework at "Permit" extension point. -func (f *Fluence) Permit( +func (fluence *Fluence) Permit( ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string, ) (*framework.Status, time.Duration) { - f.log.Info("Checking permit for pod %s to node %s", pod.Name, nodeName) - waitTime := *f.scheduleTimeout - s := f.podGroupManager.Permit(ctx, state, pod) + fluence.log.Info("Checking permit for pod %s to node %s", pod.Name, nodeName) + waitTime := *fluence.scheduleTimeout + s := fluence.podGroupManager.Permit(ctx, state, pod) var retStatus *framework.Status switch s { case fcore.PodGroupNotSpecified: - f.log.Info("Checking permit for pod %s to node %s: PodGroupNotSpecified", pod.Name, nodeName) + fluence.log.Info("Checking permit for pod %s to node %s: PodGroupNotSpecified", pod.Name, nodeName) return framework.NewStatus(framework.Success, ""), 0 case fcore.PodGroupNotFound: - f.log.Info("Checking permit for pod %s to node %s: PodGroupNotFound", pod.Name, nodeName) + fluence.log.Info("Checking permit for pod %s to node %s: PodGroupNotFound", pod.Name, nodeName) return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 case fcore.Wait: - f.log.Info("Pod %s is waiting to be scheduled to node %s", pod.Name, nodeName) - _, podGroup := f.podGroupManager.GetPodGroup(ctx, pod) - if wait := fgroup.GetWaitTimeDuration(podGroup, f.scheduleTimeout); wait != 0 { + fluence.log.Info("Pod %s is waiting to be scheduled to node %s", pod.Name, nodeName) + _, podGroup := fluence.podGroupManager.GetPodGroup(ctx, pod) + if wait := fgroup.GetWaitTimeDuration(podGroup, fluence.scheduleTimeout); wait != 0 { waitTime = wait } retStatus = framework.NewStatus(framework.Wait) // We will also request to move the sibling pods back to activeQ. - f.podGroupManager.ActivateSiblings(pod, state) + fluence.podGroupManager.ActivateSiblings(pod, state) case fcore.Success: podGroupFullName := flabel.GetPodGroupFullName(pod) - f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + fluence.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if flabel.GetPodGroupFullName(waitingPod.GetPod()) == podGroupFullName { - f.log.Info("Permit allows pod %s", waitingPod.GetPod().Name) - waitingPod.Allow(f.Name()) + fluence.log.Info("Permit allows pod %s", waitingPod.GetPod().Name) + waitingPod.Allow(fluence.Name()) } }) - f.log.Info("Permit allows pod %s", pod.Name) + fluence.log.Info("Permit allows pod %s", pod.Name) retStatus = framework.NewStatus(framework.Success) waitTime = 0 } @@ -340,21 +339,21 @@ func (f *Fluence) Permit( } // Reserve is the functions invoked by the framework at "reserve" extension point. -func (f *Fluence) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { +func (fluence *Fluence) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { return nil } // Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. -func (f *Fluence) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { - groupName, podGroup := f.podGroupManager.GetPodGroup(ctx, pod) +func (fluence *Fluence) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { + groupName, podGroup := fluence.podGroupManager.GetPodGroup(ctx, pod) if podGroup == nil { return } - f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + fluence.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == podGroup.Name { - f.log.Info("Unreserve rejects pod %s in group %s", waitingPod.GetPod().Name, groupName) - waitingPod.Reject(f.Name(), "rejection in Unreserve") + fluence.log.Info("Unreserve rejects pod %s in group %s", waitingPod.GetPod().Name, groupName) + waitingPod.Reject(fluence.Name(), "rejection in Unreserve") } }) - f.podGroupManager.DeletePermittedPodGroup(groupName) + fluence.podGroupManager.DeletePermittedPodGroup(groupName) } diff --git a/sig-scheduler-plugins/pkg/fluence/register.go b/sig-scheduler-plugins/pkg/fluence/register.go index 8f39f09..1505633 100644 --- a/sig-scheduler-plugins/pkg/fluence/register.go +++ b/sig-scheduler-plugins/pkg/fluence/register.go @@ -29,27 +29,27 @@ import ( // here goes away we cannot remove it from being known. But it's better than // not having it, and having fluxion assume more resources than the // cluster has available. This is a TODO as fluxion does not support it -func (f *Fluence) RegisterExisting(ctx context.Context) error { +func (fluence *Fluence) RegisterExisting(ctx context.Context) error { // creates an in-cluster config and client config, err := rest.InClusterConfig() if err != nil { - f.log.Error("[Fluence RegisterExisting] Error creating in-cluster config: %s\n", err) + fluence.log.Error("[Fluence RegisterExisting] Error creating in-cluster config: %s\n", err) return err } // creates the clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { - f.log.Error("[Fluence RegisterExisting] Error creating client for config: %s\n", err) + fluence.log.Error("[Fluence RegisterExisting] Error creating client for config: %s\n", err) return err } // get pods in all the namespaces by omitting namespace // Or specify namespace to get pods in particular namespace pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) if err != nil { - f.log.Info("[Fluence RegisterExisting] Error listing pods: %s\n", err) + fluence.log.Info("[Fluence RegisterExisting] Error listing pods: %s\n", err) return err } - f.log.Info("[Fluence RegisterExisting] Found %d existing pods in the cluster\n", len(pods.Items)) + fluence.log.Info("[Fluence RegisterExisting] Found %d existing pods in the cluster\n", len(pods.Items)) return nil } diff --git a/sig-scheduler-plugins/pkg/fluence/utils/utils.go b/sig-scheduler-plugins/pkg/fluence/utils/utils.go index f24f6d4..da9053b 100644 --- a/sig-scheduler-plugins/pkg/fluence/utils/utils.go +++ b/sig-scheduler-plugins/pkg/fluence/utils/utils.go @@ -44,16 +44,16 @@ func getPodJobspecLabels(pod *v1.Pod) []string { // jobspec based on the group and not the individual ID. // This calculates across containers in the od func PreparePodJobSpec(pod *v1.Pod, groupName string) *pb.PodSpec { - ps := new(pb.PodSpec) - ps.Id = groupName + podSpec := new(pb.PodSpec) + podSpec.Id = groupName - // Note from vsoch - there was an if check here to see if we had labels, + // There was an if check here to see if we had labels, // I don't think there is risk to adding an empty list but we can add // the check back if there is - ps.Labels = getPodJobspecLabels(pod) + podSpec.Labels = getPodJobspecLabels(pod) // the jobname should be the group name - ps.Container = groupName + podSpec.Container = groupName // Create accumulated requests for cpu and limits // CPU and memory are summed across containers @@ -87,12 +87,12 @@ func PreparePodJobSpec(pod *v1.Pod, groupName string) *pb.PodSpec { if cpus == 0 { cpus = 1 } - ps.Cpu = cpus - ps.Gpu = gpus - ps.Memory = memory - ps.Storage = storage + podSpec.Cpu = cpus + podSpec.Gpu = gpus + podSpec.Memory = memory + podSpec.Storage = storage // I removed specRequests.Cpu().MilliValue() but we can add back some derivative if desired - klog.Infof("[Jobspec] Pod spec: CPU %v, memory %v, GPU %v, storage %v", ps.Cpu, ps.Memory, ps.Gpu, ps.Storage) - return ps + klog.Infof("[Jobspec] Pod spec: CPU %v, memory %v, GPU %v, storage %v", podSpec.Cpu, podSpec.Memory, podSpec.Gpu, podSpec.Storage) + return podSpec } diff --git a/src/Makefile b/src/Makefile index af5fcb3..e31c8ec 100644 --- a/src/Makefile +++ b/src/Makefile @@ -4,8 +4,8 @@ INSTALL_PREFIX ?= /usr LIB_PREFIX ?= /usr/lib LOCALBIN ?= $(shell pwd)/bin COMMONENVVAR=GOOS=$(shell uname -s | tr A-Z a-z) -#BUILDENVVAR=CGO_CFLAGS="-I${FLUX_SCHED_ROOT}/resource/reapi/bindings/c" CGO_LDFLAGS="-L${INSTALL_PREFIX}/lib -L${FLUX_SCHED_ROOT}/resource -lresource -L${FLUX_SCHED_ROOT}/resource/libjobspec -ljobspec_conv -L/${FLUX_SCHED_ROOT}/resource/reapi/bindings -lreapi_cli -lflux-idset -lstdc++ -lczmq -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" -BUILDENVVAR=CGO_CFLAGS="-I${FLUX_SCHED_ROOT} -I${FLUX_SCHED_ROOT}/resource/reapi/bindings/c" CGO_LDFLAGS="-L${LIB_PREFIX} -L${LIB_PREFIX}/flux -L${FLUX_SCHED_ROOT}/resource/reapi/bindings -lreapi_cli -lflux-idset -lstdc++ -lczmq -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" +#BUILDENVVAR=CGO_CFLAGS="-I${FLUX_SCHED_ROOT}/resource/reapi/bindings/c" CGO_LDFLAGS="-L${INSTALL_PREFIX}/lib -L${FLUX_SCHED_ROOT}/resource -lresource -L${FLUX_SCHED_ROOT}/resource/libjobspec -ljobspec_conv -L/${FLUX_SCHED_ROOT}/resource/reapi/bindings -lreapi_cli -lflux-idset -lstdc++ -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" +BUILDENVVAR=CGO_CFLAGS="-I${FLUX_SCHED_ROOT} -I${FLUX_SCHED_ROOT}/resource/reapi/bindings/c" CGO_LDFLAGS="-L${LIB_PREFIX} -L${LIB_PREFIX}/flux -L${FLUX_SCHED_ROOT}/resource/reapi/bindings -lreapi_cli -lflux-idset -lstdc++ -ljansson -lhwloc -lboost_system -lflux-hostlist -lboost_graph -lyaml-cpp" LOCAL_REGISTRY=localhost:5000 diff --git a/src/fluence/cmd/main.go b/src/fluence/cmd/main.go index 3fb6a06..e8ef87d 100644 --- a/src/fluence/cmd/main.go +++ b/src/fluence/cmd/main.go @@ -48,12 +48,12 @@ func main() { } responsechan = make(chan string) - s := grpc.NewServer( + server := grpc.NewServer( grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: 5 * time.Minute, }), ) - pb.RegisterFluxcliServiceServer(s, &flux) + pb.RegisterFluxcliServiceServer(server, &flux) // External plugin (Kubectl) GRPC // This will eventually be an external GRPC module that can @@ -64,11 +64,11 @@ func main() { if *enableServicePlugin { plugin := service.ExternalService{} plugin.Init() - svcPb.RegisterExternalPluginServiceServer(s, &plugin) + svcPb.RegisterExternalPluginServiceServer(server, &plugin) } fmt.Printf("[GRPCServer] gRPC Listening on %s\n", lis.Addr().String()) - if err := s.Serve(lis); err != nil { + if err := server.Serve(lis); err != nil { fmt.Printf("[GRPCServer] failed to serve: %v\n", err) } diff --git a/src/fluence/fluxion/fluxion.go b/src/fluence/fluxion/fluxion.go index 05e94fa..f288cdf 100644 --- a/src/fluence/fluxion/fluxion.go +++ b/src/fluence/fluxion/fluxion.go @@ -20,10 +20,10 @@ type Fluxion struct { } // InitFluxion creates a new client to interaction with the fluxion API (via go bindings) -func (f *Fluxion) InitFluxion(policy *string, label *string) { - f.cli = fluxcli.NewReapiClient() +func (fluxion *Fluxion) InitFluxion(policy *string, label *string) { + fluxion.cli = fluxcli.NewReapiClient() - klog.Infof("[Fluence] Created flux resource client %s", f.cli) + klog.Infof("[Fluence] Created flux resource client %s", fluxion.cli) err := utils.CreateJGF(defaults.KubernetesJsonGraphFormat, label) if err != nil { return @@ -40,26 +40,25 @@ func (f *Fluxion) InitFluxion(policy *string, label *string) { p = string("{\"matcher_policy\": \"" + *policy + "\"}") klog.Infof("[Fluence] match policy: %s", p) } - - f.cli.InitContext(string(jgf), p) + fluxion.cli.InitContext(string(jgf), p) } // Cancel wraps the Cancel function of the fluxion go bindings -func (s *Fluxion) Cancel(ctx context.Context, in *pb.CancelRequest) (*pb.CancelResponse, error) { +func (fluxion *Fluxion) Cancel(ctx context.Context, in *pb.CancelRequest) (*pb.CancelResponse, error) { klog.Infof("[Fluence] received cancel request %v\n", in) - err := s.cli.Cancel(int64(in.JobID), true) + err := fluxion.cli.Cancel(int64(in.JobID), true) if err != nil { - return nil, errors.New("Error in Cancel") + return nil, err } // Why would we have an error code here if we check above? // This (I think) should be an error code for the specific job dr := &pb.CancelResponse{JobID: in.JobID} klog.Infof("[Fluence] sending cancel response %v\n", dr) - klog.Infof("[Fluence] cancel errors so far: %s\n", s.cli.GetErrMsg()) + klog.Infof("[Fluence] cancel errors so far: %s\n", fluxion.cli.GetErrMsg()) - reserved, at, overhead, mode, fluxerr := s.cli.Info(int64(in.JobID)) + reserved, at, overhead, mode, fluxerr := fluxion.cli.Info(int64(in.JobID)) klog.Infof("\n\t----Job Info output---") klog.Infof("jobid: %d\nreserved: %t\nat: %d\noverhead: %f\nmode: %s\nerror: %d\n", in.JobID, reserved, at, overhead, mode, fluxerr) @@ -67,48 +66,27 @@ func (s *Fluxion) Cancel(ctx context.Context, in *pb.CancelRequest) (*pb.CancelR return dr, nil } -// generateJobSpec generates a jobspec for a match request and returns the string -func (s *Fluxion) generateJobspec(in *pb.MatchRequest) ([]byte, error) { - - spec := []byte{} - - // Create a temporary file to write and read the jobspec - // The first parameter here as the empty string creates in /tmp - file, err := os.CreateTemp("", "jobspec.*.yaml") - if err != nil { - return spec, err - } - defer os.Remove(file.Name()) - jobspec.CreateJobSpecYaml(in.Ps, in.Count, file.Name()) - - spec, err = os.ReadFile(file.Name()) - if err != nil { - return spec, errors.New("Error reading jobspec") - } - return spec, err -} - // Match wraps the MatchAllocate function of the fluxion go bindings // If a match is not possible, we return the error and an empty response -func (s *Fluxion) Match(ctx context.Context, in *pb.MatchRequest) (*pb.MatchResponse, error) { +func (fluxion *Fluxion) Match(ctx context.Context, in *pb.MatchRequest) (*pb.MatchResponse, error) { emptyResponse := &pb.MatchResponse{} // Prepare an empty match response (that can still be serialized) klog.Infof("[Fluence] Received Match request %v\n", in) - // Generate the jobspec, written to temporary file and read as string - spec, err := s.generateJobspec(in) + // Generate the jobspec, array of bytes converted to string + spec, err := jobspec.CreateJobSpecYaml(in.Ps, in.Count) if err != nil { return emptyResponse, err } // Ask flux to match allocate! - reserved, allocated, at, overhead, jobid, fluxerr := s.cli.MatchAllocate(false, string(spec)) + reserved, allocated, at, overhead, jobid, fluxerr := fluxion.cli.MatchAllocate(false, string(spec)) utils.PrintOutput(reserved, allocated, at, overhead, jobid, fluxerr) // Be explicit about errors (or not) - errorMessages := s.cli.GetErrMsg() + errorMessages := fluxion.cli.GetErrMsg() if errorMessages == "" { klog.Infof("[Fluence] There are no errors") } else { diff --git a/src/fluence/jgf/jgf.go b/src/fluence/jgf/jgf.go index 1f45235..8a047f9 100644 --- a/src/fluence/jgf/jgf.go +++ b/src/fluence/jgf/jgf.go @@ -17,6 +17,7 @@ package jgf import ( "encoding/json" + "fmt" "log" "os" "strconv" @@ -26,13 +27,26 @@ import ( var ( // Defaults for nodes defaultExclusive = false - defaultRank = -1 - defaultSize = 1 + defaultRank = int64(-1) + defaultSize = int64(1) defaultUnit = "" // Relations - containsRelation = "contains" - inRelation = "in" + ContainsRelation = "contains" + InRelation = "in" + + // Vertex (node) types + // These are public to be used in the utils package + ClusterType = "cluster" + NodeType = "node" + CoreType = "core" + VirtualCoreType = "vcore" + RackType = "rack" + SocketType = "socket" + SubnetType = "subnet" + MemoryType = "memory" + NvidiaGPU = "nvidiagpu" + GPUType = "gpu" // Paths containmentKey = "containment" @@ -73,31 +87,20 @@ func (g *Fluxjgf) MakeEdge(source string, target string, contains string) { }, } g.Graph.Edges = append(g.Graph.Edges, newedge) - if contains == containsRelation { + if contains == ContainsRelation { tnode := g.NodeMap[target] tnode.Metadata.Paths[containmentKey] = g.NodeMap[source].Metadata.Paths[containmentKey] + "/" + tnode.Metadata.Name } } -// processLabels selects a subset based on a string filter -func processLabels(labels *map[string]string, filter string) (filtered map[string]string) { - filtered = map[string]string{} - for key, v := range *labels { - if strings.Contains(key, filter) { - filtered[key] = v - } - } - return -} - // MakeSubnet creates a subnet for the graph -func (g *Fluxjgf) MakeSubnet(index int, ip string) string { +func (g *Fluxjgf) MakeSubnet(index int64, ip string) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "subnet", + Type: SubnetType, Basename: ip, - Name: ip + strconv.Itoa(g.Elements), + Name: ip + fmt.Sprintf("%d", g.Elements), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -114,11 +117,11 @@ func (g *Fluxjgf) MakeSubnet(index int, ip string) string { // MakeNode creates a new node for the graph func (g *Fluxjgf) MakeNode(index int, exclusive bool, subnet string) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "node", + Type: NodeType, Basename: subnet, - Name: subnet + strconv.Itoa(g.Elements), + Name: subnet + fmt.Sprintf("%d", g.Elements), Id: g.Elements, Uniq_id: g.Elements, Rank: defaultRank, @@ -133,13 +136,13 @@ func (g *Fluxjgf) MakeNode(index int, exclusive bool, subnet string) string { } // MakeSocket creates a socket for the graph -func (g *Fluxjgf) MakeSocket(index int, name string) string { +func (g *Fluxjgf) MakeSocket(index int64, name string) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "socket", + Type: SocketType, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -154,13 +157,13 @@ func (g *Fluxjgf) MakeSocket(index int, name string) string { } // MakeCore creates a core for the graph -func (g *Fluxjgf) MakeCore(index int, name string) string { +func (g *Fluxjgf) MakeCore(index int64, name string) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "core", + Type: CoreType, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -175,13 +178,13 @@ func (g *Fluxjgf) MakeCore(index int, name string) string { } // MakeVCore makes a vcore (I think 2 vcpu == 1 cpu) for the graph -func (g *Fluxjgf) MakeVCore(coreid string, index int, name string) string { +func (g *Fluxjgf) MakeVCore(coreid string, index int64, name string) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "vcore", + Type: VirtualCoreType, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -192,13 +195,13 @@ func (g *Fluxjgf) MakeVCore(coreid string, index int, name string) string { }, } g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, containsRelation) - g.MakeEdge(newnode.Id, coreid, inRelation) + g.MakeEdge(coreid, newnode.Id, ContainsRelation) + g.MakeEdge(newnode.Id, coreid, InRelation) return newnode.Id } // MakeNFProperties makes the node feature discovery properties for the graph -func (g *Fluxjgf) MakeNFDProperties(coreid string, index int, filter string, labels *map[string]string) { +func (g *Fluxjgf) MakeNFDProperties(coreid string, index int64, filter string, labels *map[string]string) { for key, _ := range *labels { if strings.Contains(key, filter) { name := strings.Split(key, "/")[1] @@ -207,11 +210,11 @@ func (g *Fluxjgf) MakeNFDProperties(coreid string, index int, filter string, lab } newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ Type: name, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -222,22 +225,22 @@ func (g *Fluxjgf) MakeNFDProperties(coreid string, index int, filter string, lab }, } g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, containsRelation) + g.MakeEdge(coreid, newnode.Id, ContainsRelation) } } } -func (g *Fluxjgf) MakeNFDPropertiesByValue(coreid string, index int, filter string, labels *map[string]string) { +func (g *Fluxjgf) MakeNFDPropertiesByValue(coreid string, index int64, filter string, labels *map[string]string) { for key, val := range *labels { if strings.Contains(key, filter) { name := val newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ Type: name, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -248,19 +251,19 @@ func (g *Fluxjgf) MakeNFDPropertiesByValue(coreid string, index int, filter stri }, } g.addNode(newnode) - g.MakeEdge(coreid, newnode.Id, containsRelation) + g.MakeEdge(coreid, newnode.Id, ContainsRelation) } } } // MakeMemory creates memory for the graph -func (g *Fluxjgf) MakeMemory(index int, name string, unit string, size int) string { +func (g *Fluxjgf) MakeMemory(index int64, name string, unit string, size int64) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "memory", + Type: MemoryType, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -275,13 +278,13 @@ func (g *Fluxjgf) MakeMemory(index int, name string, unit string, size int) stri } // MakeGPU makes a gpu for the graph -func (g *Fluxjgf) MakeGPU(index int, name string, size int) string { +func (g *Fluxjgf) MakeGPU(index int64, name string, size int64) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "gpu", + Type: GPUType, Basename: name, - Name: name + strconv.Itoa(index), + Name: name + fmt.Sprintf("%d", index), Id: index, Uniq_id: g.Elements, Rank: defaultRank, @@ -301,7 +304,7 @@ func (g *Fluxjgf) MakeCluster(clustername string) string { newnode := node{ Id: strconv.Itoa(0), Metadata: nodeMetadata{ - Type: "cluster", + Type: ClusterType, Basename: clustername, Name: clustername + "0", Id: g.Elements, @@ -320,14 +323,14 @@ func (g *Fluxjgf) MakeCluster(clustername string) string { } // MakeRack makes the rack -func (g *Fluxjgf) MakeRack(id int) string { +func (g *Fluxjgf) MakeRack(index int64) string { newnode := node{ - Id: strconv.Itoa(g.Elements), + Id: fmt.Sprintf("%d", g.Elements), Metadata: nodeMetadata{ - Type: "rack", - Basename: "rack", - Name: "rack" + strconv.Itoa(id), - Id: id, + Type: RackType, + Basename: RackType, + Name: RackType + fmt.Sprintf("%d", index), + Id: index, Uniq_id: g.Elements, Rank: defaultRank, Exclusive: defaultExclusive, diff --git a/src/fluence/jgf/types.go b/src/fluence/jgf/types.go index b2b743f..21ccd00 100644 --- a/src/fluence/jgf/types.go +++ b/src/fluence/jgf/types.go @@ -38,12 +38,12 @@ type nodeMetadata struct { Type string `json:"type"` Basename string `json:"basename"` Name string `json:"name"` - Id int `json:"id"` - Uniq_id int `json:"uniq_id"` - Rank int `json:"rank,omitempty"` + Id int64 `json:"id"` + Uniq_id int64 `json:"uniq_id"` + Rank int64 `json:"rank,omitempty"` Exclusive bool `json:"exclusive"` Unit string `json:"unit"` - Size int `json:"size"` + Size int64 `json:"size"` Paths map[string]string `json:"paths,omitempty"` Properties map[string]string `json:"properties,omitempty"` } @@ -57,6 +57,6 @@ type graph struct { type Fluxjgf struct { Graph graph `json:"graph"` - Elements int `json:"-"` + Elements int64 `json:"-"` NodeMap map[string]node `json:"-"` } diff --git a/src/fluence/jobspec/jobspec.go b/src/fluence/jobspec/jobspec.go index 683f586..96ed0fe 100644 --- a/src/fluence/jobspec/jobspec.go +++ b/src/fluence/jobspec/jobspec.go @@ -18,8 +18,6 @@ package jobspec import ( "fmt" "log" - "math" - "os" pb "github.com/flux-framework/flux-k8s/flux-plugin/fluence/fluxcli-grpc" "gopkg.in/yaml.v2" @@ -39,7 +37,7 @@ Ps: &pb.PodSpec{ */ // CreateJobSpecYaml writes the protobuf jobspec into a yaml file -func CreateJobSpecYaml(spec *pb.PodSpec, count int32, filename string) error { +func CreateJobSpecYaml(spec *pb.PodSpec, count int32) ([]byte, error) { command := []string{spec.Container} fmt.Println("Labels ", spec.Labels, " ", len(spec.Labels)) @@ -68,38 +66,9 @@ func CreateJobSpecYaml(spec *pb.PodSpec, count int32, filename string) error { yamlbytes, err := yaml.Marshal(&js) if err != nil { log.Fatalf("[JobSpec] yaml.Marshal failed with '%s'\n", err) - return err + return yamlbytes, err } - return writeBytes(yamlbytes, filename) -} - -// WriteBytes writes a byte string to file -func writeBytes(bytelist []byte, filename string) error { - fmt.Printf("[JobSpec] Preparing to write:\n%s\n", string(bytelist)) - f, err := os.Create(filename) - if err != nil { - log.Fatalf("[JobSpec] Couldn't create file!!\n") - return err - } - defer f.Close() - - _, err = f.Write(bytelist) - if err != nil { - log.Fatalf("[JobSpec] Couldn't write file!!\n") - return err - } - - // Not sure why this is here, but will keep for now - _, err = f.WriteString("\n") - if err != nil { - log.Fatalf("[JobSpec] Couldn't append newline to file!!\n") - } - return err -} - -func toGB(bytes int64) int64 { - res := float64(bytes) / math.Pow(10, 9) - return int64(res) + return yamlbytes, nil } // createSocketResources creates the socket resources for the JobSpec diff --git a/src/fluence/utils/utils.go b/src/fluence/utils/utils.go index e429056..490a0e0 100644 --- a/src/fluence/utils/utils.go +++ b/src/fluence/utils/utils.go @@ -93,23 +93,18 @@ func CreateJGF(filename string, skipLabel *string) error { // Create a Flux Json Graph Format (JGF) with all cluster nodes fluxgraph := jgf.InitJGF() - // TODO it looks like we can add more to the graph here - - // let's remember to consider what else we can. - // subnets := make(map[string]string) - + // Top level of the graph is the cluster + // This assumes fluxion is only serving one cluster. + // previous comments indicate that we choose between the level + // of a rack and a subnet. A rack doesn't make sense (the nodes could + // be on multiple racks) so subnet is likely the right abstraction cluster := fluxgraph.MakeCluster("k8scluster") - // Rack needs to be disabled when using subnets - // rack := fluxgraph.MakeRack(0) - - // fluxgraph.MakeEdge(cluster, rack, "contains") - // fluxgraph.MakeEdge(rack, cluster, "in") - vcores := 0 fmt.Println("Number nodes ", len(nodes.Items)) var totalAllocCpu int64 totalAllocCpu = 0 - sdnCount := 0 + sdnCount := int64(0) for nodeIndex, node := range nodes.Items { @@ -146,13 +141,12 @@ func CreateJGF(filename string, skipLabel *string) error { return err } - // Check if subnet already exists - // Here we build subnets according to topology.kubernetes.io/zone label + // Here we build the subnet according to topology.kubernetes.io/zone label subnetName := node.Labels["topology.kubernetes.io/zone"] subnet := fluxgraph.MakeSubnet(sdnCount, subnetName) sdnCount = sdnCount + 1 - fluxgraph.MakeEdge(cluster, subnet, "contains") - fluxgraph.MakeEdge(subnet, cluster, "in") + fluxgraph.MakeEdge(cluster, subnet, jgf.ContainsRelation) + fluxgraph.MakeEdge(subnet, cluster, jgf.InRelation) // These are requests for existing pods, for cpu and memory reqs := computeTotalRequests(pods) @@ -179,64 +173,44 @@ func CreateJGF(filename string, skipLabel *string) error { fmt.Printf(" available mem: %d\n", availMem) gpuAllocatable, hasGpuAllocatable := node.Status.Allocatable["nvidia.com/gpu"] - // reslist := node.Status.Allocatable - // resources := make([]corev1.ResourceName, 0, len(reslist)) - // for resource := range reslist { - // fmt.Println("resource ", resource) - // resources = append(resources, resource) - // } - // for _, resource := range resources { - // value := reslist[resource] - - // fmt.Printf(" %s:\t%s\n", resource, value.String()) - // } + // TODO possibly look at pod resources vs. node.Status.Allocatable workernode := fluxgraph.MakeNode(nodeIndex, false, node.Name) - fluxgraph.MakeEdge(subnet, workernode, "contains") // this is rack otherwise - fluxgraph.MakeEdge(workernode, subnet, "in") // this is rack otherwise - - // socket := fluxgraph.MakeSocket(0, "socket") - // fluxgraph.MakeEdge(workernode, socket, "contains") - // fluxgraph.MakeEdge(socket, workernode, "in") + fluxgraph.MakeEdge(subnet, workernode, jgf.ContainsRelation) + fluxgraph.MakeEdge(workernode, subnet, jgf.InRelation) if hasGpuAllocatable { fmt.Println("GPU Resource quantity ", gpuAllocatable.Value()) - //MakeGPU(index int, name string, size int) string { for index := 0; index < int(gpuAllocatable.Value()); index++ { - gpu := fluxgraph.MakeGPU(index, "nvidiagpu", 1) - fluxgraph.MakeEdge(workernode, gpu, "contains") // workernode was socket - fluxgraph.MakeEdge(gpu, workernode, "in") + gpu := fluxgraph.MakeGPU(int64(index), jgf.NvidiaGPU, 1) + fluxgraph.MakeEdge(workernode, gpu, jgf.ContainsRelation) + fluxgraph.MakeEdge(gpu, workernode, jgf.InRelation) } } for index := 0; index < int(availCpu); index++ { - // MakeCore(index int, name string) - core := fluxgraph.MakeCore(index, "core") - fluxgraph.MakeEdge(workernode, core, "contains") // workernode was socket - fluxgraph.MakeEdge(core, workernode, "in") + core := fluxgraph.MakeCore(int64(index), jgf.CoreType) + fluxgraph.MakeEdge(workernode, core, jgf.ContainsRelation) + fluxgraph.MakeEdge(core, workernode, jgf.InRelation) // Question from Vanessa: // How can we get here and have vcores ever not equal to zero? if vcores == 0 { - fluxgraph.MakeNFDProperties(core, index, "cpu-", &node.Labels) - // fluxgraph.MakeNFDProperties(core, index, "netmark-", &node.Labels) + fluxgraph.MakeNFDProperties(core, int64(index), "cpu-", &node.Labels) } else { - for vc := 0; vc < vcores; vc++ { - vcore := fluxgraph.MakeVCore(core, vc, "vcore") - fluxgraph.MakeNFDProperties(vcore, index, "cpu-", &node.Labels) + for virtualCore := 0; virtualCore < vcores; virtualCore++ { + vcore := fluxgraph.MakeVCore(core, int64(virtualCore), jgf.VirtualCoreType) + fluxgraph.MakeNFDProperties(vcore, int64(index), "cpu-", &node.Labels) } } } - // MakeMemory(index int, name string, unit string, size int) fractionMem := availMem >> 30 - // fractionmem := (totalmem/totalcpu) >> 20 - // fmt.Println("Creating ", fractionmem, " vertices with ", 1<<10, " MB of mem") - for i := 0; i < /*int(totalcpu)*/ int(fractionMem); i++ { - mem := fluxgraph.MakeMemory(i, "memory", "MB", int(1<<10)) - fluxgraph.MakeEdge(workernode, mem, "contains") - fluxgraph.MakeEdge(mem, workernode, "in") + for i := 0; i < int(fractionMem); i++ { + mem := fluxgraph.MakeMemory(int64(i), jgf.MemoryType, "MB", 1<<10) + fluxgraph.MakeEdge(workernode, mem, jgf.ContainsRelation) + fluxgraph.MakeEdge(mem, workernode, jgf.InRelation) } } fmt.Printf("\nCan request at most %d exclusive cpu", totalAllocCpu) @@ -248,6 +222,7 @@ func CreateJGF(filename string, skipLabel *string) error { } +// computeTotalRequests sums up the pod requests for the list. We do not consider limits. func computeTotalRequests(podList *corev1.PodList) (total map[corev1.ResourceName]resource.Quantity) { total = map[corev1.ResourceName]resource.Quantity{} for _, pod := range podList.Items { @@ -260,14 +235,6 @@ func computeTotalRequests(podList *corev1.PodList) (total map[corev1.ResourceNam total[podReqName] = v } } - // for podLimitName, podLimitValue := range podLimits { - // if v, ok := total[podLimitName]; !ok { - // total[podLimitName] = podLimitValue - // } else { - // v.Add(podLimitValue) - // total[podLimitName] = v - // } - // } } return } @@ -295,17 +262,17 @@ func ParseAllocResult(allocated, podName string) []allocation { // Parse graph and nodes into interfaces // TODO look at github.com/mitchellh/mapstructure // that might make this easier - nodes := dat["graph"].(interface{}) + nodes := dat["graph"] str1 := nodes.(map[string]interface{}) str2 := str1["nodes"].([]interface{}) for _, item := range str2 { str1 = item.(map[string]interface{}) metadata := str1["metadata"].(map[string]interface{}) - if metadata["type"].(string) == "core" { + if metadata["type"].(string) == jgf.CoreType { corecount = corecount + 1 } - if metadata["type"].(string) == "node" { + if metadata["type"].(string) == jgf.NodeType { result = append(result, allocation{ Type: metadata["type"].(string), Name: metadata["name"].(string), @@ -334,6 +301,6 @@ func PrintOutput(reserved bool, allocated string, at int64, overhead float64, jo // Only print error if we had one if fluxerr != nil { - fmt.Printf("error: %w\n", fluxerr) + fmt.Printf("error: %s\n", fluxerr) } }