Skip to content

Commit

Permalink
Merge pull request #94 from ruivieira/rhoai-2.16.1-enable-lmeval
Browse files Browse the repository at this point in the history
feat(lmeval): Re-enable LMEval controller
  • Loading branch information
ruivieira authored Dec 13, 2024
2 parents c6729ed + e1646cd commit 67da0c4
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 33 deletions.
3 changes: 0 additions & 3 deletions config/overlays/rhoai/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ kind: Kustomization
resources:
- ../../base

patchesStrategicMerge:
- tas-only-patch.yaml

configMapGenerator:
- env: params.env
behavior: merge
Expand Down
14 changes: 0 additions & 14 deletions config/overlays/rhoai/tas-only-patch.yaml

This file was deleted.

31 changes: 19 additions & 12 deletions controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package controllers

import (
"errors"
"fmt"
"slices"
"strings"
Expand Down Expand Up @@ -45,23 +46,29 @@ func registerService(name string, setupf ControllerSetupFunc) {
}

func SetupControllers(enabledServices []string, mgr manager.Manager, ns, configmap string, recorder record.EventRecorder) error {
if len(enabledServices) == 0 || enabledServices[0] != "TAS" {
return fmt.Errorf("only TAS is supported")
var errs []error
for _, service := range enabledServices {
errs = append(errs, TasServices[service](mgr, ns, configmap, recorder))
}
if setupFunc, ok := TasServices["TAS"]; ok {
return setupFunc(mgr, ns, configmap, recorder)
}
return fmt.Errorf("TAS service is not registered")
return errors.Join(errs...)
}

func (es *EnabledServices) Set(services string) error {
if services != "TAS" {
return fmt.Errorf("only TAS is supported, but %s was provided", services)
}
if slices.Contains(*es, services) {
return fmt.Errorf("TAS is already enabled")
for _, service := range strings.Split(services, ",") {
if slices.Contains(*es, service) {
return fmt.Errorf("specify the same service twice: %s", service)
}
if _, ok := TasServices[service]; ok {
*es = append(*es, service)
} else {
return fmt.Errorf(
"service %s is not supported. available services: %s",
service,
strings.Join(AllTasServices, ","),
)
}
}
*es = append(*es, services)

return nil
}

Expand Down
74 changes: 70 additions & 4 deletions controllers/lmes/lmevaljob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,78 @@ func (q *syncedMap4Reconciler) remove(key string) {
func (r *LMEvalJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)

r.Recorder.Eventf(&lmesv1alpha1.LMEvalJob{}, corev1.EventTypeWarning, "NotSupported",
"LMEvalJob CRD is not currently supported")
job := &lmesv1alpha1.LMEvalJob{}
if err := r.Get(ctx, req.NamespacedName, job); err != nil {
log.Info("unable to fetch LMEvalJob. could be from a deletion request")
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !job.ObjectMeta.DeletionTimestamp.IsZero() {
// Handle deletion here
return r.handleDeletion(ctx, job, log)
}

// Treat this as NewJobState
if job.Status.LastScheduleTime == nil && job.Status.CompleteTime == nil {
job.Status.State = lmesv1alpha1.NewJobState
}

log.Info("LMEvalJob CRD is not supported. Ignoring reconciliation request.")
if JobMgrEnabled && job.Status.State != lmesv1alpha1.CompleteJobState {
//the job requires kueue.x-k8s.io/queue-name label if Job Manager is enabled
if _, ok := job.ObjectMeta.GetLabels()["kueue.x-k8s.io/queue-name"]; !ok {
job.Status.State = lmesv1alpha1.CompleteJobState
job.Status.Reason = lmesv1alpha1.FailedReason
job.Status.Message = "job requires kueue.x-k8s.io/queue-name label"
log.Error(fmt.Errorf("job %s requires kueue.x-k8s.io/queue-name label", job.Name), "LMevalJob requires kueue.x-k8s.io/queue-name label when Job Manager is enabled")
return r.handleComplete(ctx, log, job)
} else if job.Spec.Suspend {
return r.handleSuspend(ctx, log, job)
}
}

// If outputs have been explicitly set
if job.Spec.HasCustomOutput() {
// If managed PVC is set
if job.Spec.Outputs.HasManagedPVC() {
if job.Spec.Outputs.HasExistingPVC() {
log.Info("LMEvalJob has both managed and existing PVCs defined. Existing PVC configuration will be ignored.")
}
err := r.handleManagedPVC(ctx, log, job)
if err != nil {
return ctrl.Result{}, err
}
} else if job.Spec.Outputs.HasExistingPVC() {
err := r.handleExistingPVC(ctx, log, job)
if err != nil {
return ctrl.Result{}, err
}
}
}
log.Info("Continuing after PVC")

// Handle the job based on its state
switch job.Status.State {
case lmesv1alpha1.NewJobState:
// Handle newly created job
return r.handleNewCR(ctx, log, job)
case lmesv1alpha1.ScheduledJobState:
// the job's pod has been created and the driver hasn't updated the state yet
// let's check the pod status and detect pod failure if there is
// TODO: need a timeout/retry mechanism here to transit to other states
return r.checkScheduledPod(ctx, log, job)
case lmesv1alpha1.RunningJobState:
// TODO: need a timeout/retry mechanism here to transit to other states
return r.checkScheduledPod(ctx, log, job)
case lmesv1alpha1.CompleteJobState:
return r.handleComplete(ctx, log, job)
case lmesv1alpha1.CancelledJobState:
return r.handleCancel(ctx, log, job)
case lmesv1alpha1.SuspendedJobState:
if !job.Spec.Suspend {
return r.handleResume(ctx, log, job)
}
}

// Do nothing and return
return ctrl.Result{}, nil
}

Expand Down

0 comments on commit 67da0c4

Please sign in to comment.