Skip to content

Commit

Permalink
feat: Update RAG Status (kaito-project#621)
Browse files Browse the repository at this point in the history
**Reason for Change**:
Update RAG Status and the unit tests

**Requirements**

- [ ] added unit tests and e2e tests (if applicable).

**Issue Fixed**:
<!-- If this PR fixes GitHub issue 4321, add "Fixes #4321" to the next
line. -->

**Notes for Reviewers**:

---------

Signed-off-by: Bangqi Zhu <[email protected]>
Co-authored-by: Bangqi Zhu <[email protected]>
  • Loading branch information
bangqipropel and Bangqi Zhu authored Oct 10, 2024
1 parent f613bb4 commit 1818551
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 0 deletions.
60 changes: 60 additions & 0 deletions pkg/controllers/ragengine_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

package controllers

import (
"context"

kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *RAGEngineReconciler) updateRAGEngineStatus(ctx context.Context, name *client.ObjectKey, condition *metav1.Condition, workerNodes []string) error {
return retry.OnError(retry.DefaultRetry,
func(err error) bool {
return apierrors.IsServiceUnavailable(err) || apierrors.IsServerTimeout(err) || apierrors.IsTooManyRequests(err)
},
func() error {
// Read the latest version to avoid update conflict.
ragObj := &kaitov1alpha1.RAGEngine{}
if err := c.Client.Get(ctx, *name, ragObj); err != nil {
if !errors.IsNotFound(err) {
return err
}
return nil
}
if condition != nil {
meta.SetStatusCondition(&ragObj.Status.Conditions, *condition)
}
if workerNodes != nil {
ragObj.Status.WorkerNodes = workerNodes
}
return c.Client.Status().Update(ctx, ragObj)
})
}

func (c *RAGEngineReconciler) updateStatusConditionIfNotMatch(ctx context.Context, ragObj *kaitov1alpha1.RAGEngine, cType kaitov1alpha1.ConditionType,
cStatus metav1.ConditionStatus, cReason, cMessage string) error {
if curCondition := meta.FindStatusCondition(ragObj.Status.Conditions, string(cType)); curCondition != nil {
if curCondition.Status == cStatus && curCondition.Reason == cReason && curCondition.Message == cMessage {
// Nonthing to change
return nil
}
}
klog.InfoS("updateStatusCondition", "ragengine", klog.KObj(ragObj), "conditionType", cType, "status", cStatus, "reason", cReason, "message", cMessage)
cObj := metav1.Condition{
Type: string(cType),
Status: cStatus,
Reason: cReason,
ObservedGeneration: ragObj.GetGeneration(),
Message: cMessage,
}
return c.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragObj.Name, Namespace: ragObj.Namespace}, &cObj, nil)
}
172 changes: 172 additions & 0 deletions pkg/controllers/ragengine_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

package controllers

import (
"context"
"errors"
"testing"

kaitov1alpha1 "github.com/azure/kaito/api/v1alpha1"
"github.com/azure/kaito/pkg/utils/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestUpdateRAGEngineStatus(t *testing.T) {
t.Run("Should update ragengine status successfully", func(t *testing.T) {
mockClient := test.NewClient()
reconciler := &RAGEngineReconciler{
Client: mockClient,
Scheme: test.NewTestScheme(),
}
ctx := context.Background()
ragengine := test.MockRAGEngineDistributedModel
condition := metav1.Condition{
Type: "TestCondition",
Status: metav1.ConditionStatus("True"),
Reason: "TestReason",
Message: "TestMessage",
}
workerNodes := []string{"node1", "node2"}

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)
mockClient.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)

err := reconciler.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, &condition, workerNodes)
assert.Nil(t, err)
})

t.Run("Should return error when Get operation fails", func(t *testing.T) {
mockClient := test.NewClient()
reconciler := &RAGEngineReconciler{
Client: mockClient,
Scheme: test.NewTestScheme(),
}
ctx := context.Background()
ragengine := test.MockRAGEngineDistributedModel
condition := metav1.Condition{
Type: "TestCondition",
Status: metav1.ConditionStatus("True"),
Reason: "TestReason",
Message: "TestMessage",
}
workerNodes := []string{"node1", "node2"}

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(errors.New("Get operation failed"))

err := reconciler.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, &condition, workerNodes)
assert.NotNil(t, err)
})

t.Run("Should return nil when ragengine is not found", func(t *testing.T) {
mockClient := test.NewClient()
reconciler := &RAGEngineReconciler{
Client: mockClient,
Scheme: test.NewTestScheme(),
}
ctx := context.Background()
ragengine := test.MockRAGEngineDistributedModel
condition := metav1.Condition{
Type: "TestCondition",
Status: metav1.ConditionStatus("True"),
Reason: "TestReason",
Message: "TestMessage",
}
workerNodes := []string{"node1", "node2"}

mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(apierrors.NewNotFound(schema.GroupResource{}, "ragengine"))

err := reconciler.updateRAGEngineStatus(ctx, &client.ObjectKey{Name: ragengine.Name, Namespace: ragengine.Namespace}, &condition, workerNodes)
assert.Nil(t, err)
})
}

func TestRAGEngineUpdateStatusConditionIfNotMatch(t *testing.T) {
t.Run("Should not update when condition matches", func(t *testing.T) {
mockClient := test.NewClient()
reconciler := &RAGEngineReconciler{
Client: mockClient,
Scheme: test.NewTestScheme(),
}
ctx := context.Background()
ragengine := test.MockRAGEngineDistributedModel
conditionType := kaitov1alpha1.ConditionType("TestCondition")
conditionStatus := metav1.ConditionStatus("True")
conditionReason := "TestReason"
conditionMessage := "TestMessage"

ragengine.Status.Conditions = []metav1.Condition{
{
Type: string(conditionType),
Status: conditionStatus,
Reason: conditionReason,
Message: conditionMessage,
},
}

err := reconciler.updateStatusConditionIfNotMatch(ctx, ragengine, conditionType, conditionStatus, conditionReason, conditionMessage)
assert.Nil(t, err)
})

t.Run("Should update when condition does not match", func(t *testing.T) {
mockClient := test.NewClient()
reconciler := &RAGEngineReconciler{
Client: mockClient,
Scheme: test.NewTestScheme(),
}
ctx := context.Background()
ragengine := test.MockRAGEngineDistributedModel
conditionType := kaitov1alpha1.ConditionType("TestCondition")
conditionStatus := metav1.ConditionStatus("True")
conditionReason := "TestReason"
conditionMessage := "TestMessage"

ragengine.Status.Conditions = []metav1.Condition{
{
Type: string(conditionType),
Status: conditionStatus,
Reason: conditionReason,
Message: "DifferentMessage",
},
}
mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)
mockClient.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)

err := reconciler.updateStatusConditionIfNotMatch(ctx, ragengine, conditionType, conditionStatus, conditionReason, conditionMessage)
assert.Nil(t, err)
})

t.Run("Should update when condition is not found", func(t *testing.T) {
mockClient := test.NewClient()
reconciler := &RAGEngineReconciler{
Client: mockClient,
Scheme: test.NewTestScheme(),
}
ctx := context.Background()
ragengine := test.MockRAGEngineDistributedModel
conditionType := kaitov1alpha1.ConditionType("TestCondition")
conditionStatus := metav1.ConditionStatus("True")
conditionReason := "TestReason"
conditionMessage := "TestMessage"

ragengine.Status.Conditions = []metav1.Condition{
{
Type: "DifferentCondition",
Status: conditionStatus,
Reason: conditionReason,
Message: conditionMessage,
},
}
mockClient.On("Get", mock.IsType(context.Background()), mock.Anything, mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)
mockClient.StatusMock.On("Update", mock.IsType(context.Background()), mock.IsType(&kaitov1alpha1.RAGEngine{}), mock.Anything).Return(nil)

err := reconciler.updateStatusConditionIfNotMatch(ctx, ragengine, conditionType, conditionStatus, conditionReason, conditionMessage)
assert.Nil(t, err)
})
}
20 changes: 20 additions & 0 deletions pkg/utils/test/testUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ var (
}
)

var (
MockRAGEngineDistributedModel = &v1alpha1.RAGEngine{
ObjectMeta: metav1.ObjectMeta{
Name: "testRAGEngine",
Namespace: "kaito",
},
Spec: &v1alpha1.RAGEngineSpec{
Compute: &v1alpha1.ResourceSpec{
Count: &gpuNodeCount,
InstanceType: "Standard_NC12s_v3",
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"apps": "test",
},
},
},
},
}
)

var (
MockWorkspaceWithPreset = &v1alpha1.Workspace{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 1818551

Please sign in to comment.