Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Fine tune (Part 9) - Handling image data destination #367

Merged
merged 19 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions charts/kaito/workspace/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ rules:
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations"]
verbs: ["get","list","watch"]
- apiGroups: [ "batch" ]
resources: [ "jobs" ]
verbs: [ "get", "list", "watch", "create", "delete","update", "patch" ]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations"]
verbs: ["update"]
Expand Down
54 changes: 24 additions & 30 deletions pkg/resources/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package resources
import (
"context"
"fmt"

batchv1 "k8s.io/api/batch/v1"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -187,15 +188,30 @@ func GenerateStatefulSetManifest(ctx context.Context, workspaceObj *kaitov1alpha
func GenerateTuningJobManifest(ctx context.Context, wObj *kaitov1alpha1.Workspace, imageName string,
imagePullSecretRefs []corev1.LocalObjectReference, replicas int, commands []string, containerPorts []corev1.ContainerPort,
livenessProbe, readinessProbe *corev1.Probe, resourceRequirements corev1.ResourceRequirements, tolerations []corev1.Toleration,
initContainers []corev1.Container, volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) *batchv1.Job {
initContainers []corev1.Container, sidecarContainers []corev1.Container, volumes []corev1.Volume, volumeMounts []corev1.VolumeMount) *batchv1.Job {
labels := map[string]string{
kaitov1alpha1.LabelWorkspaceName: wObj.Name,
}
//TODO:
// Will be included in future PR, this code includes
// bash script for pushing results based on user
// data destination method
//pushMethod, pushArg := determinePushMethod(wObj)

// Add volume mounts to sidecar containers
for i := range sidecarContainers {
sidecarContainers[i].VolumeMounts = append(sidecarContainers[i].VolumeMounts, volumeMounts...)
}

// Construct the complete list of containers (main and sidecars)
containers := append([]corev1.Container{
{
Name: wObj.Name,
Image: imageName,
Command: commands,
Resources: resourceRequirements,
LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe,
Ports: containerPorts,
VolumeMounts: volumeMounts,
},
}, sidecarContainers...)

return &batchv1.Job{
TypeMeta: v1.TypeMeta{
APIVersion: "batch/v1",
Expand All @@ -221,29 +237,8 @@ func GenerateTuningJobManifest(ctx context.Context, wObj *kaitov1alpha1.Workspac
Labels: labels,
},
Spec: corev1.PodSpec{
InitContainers: initContainers,
Containers: []corev1.Container{
{
Name: wObj.Name,
Image: imageName,
Command: commands,
Resources: resourceRequirements,
LivenessProbe: livenessProbe,
ReadinessProbe: readinessProbe,
Ports: containerPorts,
VolumeMounts: volumeMounts,
},
{
Name: "docker-sidecar",
Image: "docker:dind",
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.BoolPtr(true),
},
VolumeMounts: volumeMounts,
Command: []string{"/bin/sh", "-c"},
// TODO: Args: []string{pushMethod(pushArg)},
},
},
InitContainers: initContainers,
Containers: containers,
RestartPolicy: corev1.RestartPolicyNever,
Volumes: volumes,
Tolerations: tolerations,
Expand Down Expand Up @@ -390,5 +385,4 @@ func GenerateDeploymentManifestWithPodTemplate(ctx context.Context, workspaceObj
Template: *templateCopy,
},
}

}
186 changes: 149 additions & 37 deletions pkg/tuning/preset-tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tuning
import (
"context"
"fmt"
"k8s.io/utils/pointer"
"os"
"strings"

Expand Down Expand Up @@ -67,11 +68,6 @@ func GetDataSrcImageInfo(ctx context.Context, wObj *kaitov1alpha1.Workspace) (st
return wObj.Tuning.Input.Image, imagePullSecretRefs
}

func GetDataDestImageInfo(ctx context.Context, wObj *kaitov1alpha1.Workspace) (string, []corev1.LocalObjectReference) {
imagePushSecretRefs := []corev1.LocalObjectReference{{Name: wObj.Tuning.Output.ImagePushSecret}}
return wObj.Tuning.Output.Image, imagePushSecretRefs
}

func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace,
tuningObj *model.PresetParam, kubeClient client.Client) error {
// Copy Configmap from helm chart configmap into workspace
Expand Down Expand Up @@ -109,18 +105,55 @@ func EnsureTuningConfigMap(ctx context.Context, workspaceObj *kaitov1alpha1.Work
return nil
}

func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace,
tuningObj *model.PresetParam, kubeClient client.Client) (client.Object, error) {
initContainers, imagePullSecrets, volumes, volumeMounts, err := prepareDataSource(ctx, workspaceObj, kubeClient)
if err != nil {
return nil, err
}
func dockerSidecarScriptPushImage(image string) string {
// TODO: Override output path if specified in trainingconfig (instead of /mnt/results)
return fmt.Sprintf(`
# Start the Docker daemon in the background with specific options for DinD
dockerd &
# Wait for the Docker daemon to be ready
while ! docker info > /dev/null 2>&1; do
echo "Waiting for Docker daemon to start..."
sleep 1
done
echo 'Docker daemon started'

err = EnsureTuningConfigMap(ctx, workspaceObj, tuningObj, kubeClient)
if err != nil {
return nil, err
}
while true; do
FILE_PATH=$(find /mnt/results -name 'fine_tuning_completed.txt')
if [ ! -z "$FILE_PATH" ]; then
echo "FOUND TRAINING COMPLETED FILE at $FILE_PATH"

PARENT_DIR=$(dirname "$FILE_PATH")
echo "Parent directory is $PARENT_DIR"

TEMP_CONTEXT=$(mktemp -d)
cp "$PARENT_DIR/adapter_config.json" "$TEMP_CONTEXT/adapter_config.json"
cp -r "$PARENT_DIR/adapter_model.safetensors" "$TEMP_CONTEXT/adapter_model.safetensors"

# Create a minimal Dockerfile
echo 'FROM scratch
ADD adapter_config.json /
ADD adapter_model.safetensors /' > "$TEMP_CONTEXT/Dockerfile"

docker build -t %s "$TEMP_CONTEXT"
docker push %s

# Cleanup: Remove the temporary directory
rm -rf "$TEMP_CONTEXT"

# Remove the file to prevent repeated builds
rm "$FILE_PATH"
echo "Upload complete"
exit 0
fi
sleep 10 # Check every 10 seconds
done`, image, image)
}

func setupDefaultSharedVolumes(workspaceObj *kaitov1alpha1.Workspace) ([]corev1.Volume, []corev1.VolumeMount) {
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount

// Add shared volume for shared memory (multi-node)
shmVolume, shmVolumeMount := utils.ConfigSHMVolume(*workspaceObj.Resource.Count)
if shmVolume.Name != "" {
volumes = append(volumes, shmVolume)
Expand All @@ -129,10 +162,55 @@ func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspa
volumeMounts = append(volumeMounts, shmVolumeMount)
}

// Add shared volume for tuning parameters
cmVolume, cmVolumeMount := utils.ConfigCMVolume(workspaceObj.Tuning.ConfigTemplate)
volumes = append(volumes, cmVolume)
volumeMounts = append(volumeMounts, cmVolumeMount)

// Add shared volume for results dir
resultsVolume, resultsVolumeMount := utils.ConfigResultsVolume()
if resultsVolume.Name != "" {
volumes = append(volumes, resultsVolume)
}
if resultsVolumeMount.Name != "" {
volumeMounts = append(volumeMounts, resultsVolumeMount)
}
return volumes, volumeMounts
}

func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace,
tuningObj *model.PresetParam, kubeClient client.Client) (client.Object, error) {
var initContainers, sidecarContainers []corev1.Container
volumes, volumeMounts := setupDefaultSharedVolumes(workspaceObj)

initContainer, imagePullSecrets, dataSourceVolume, dataSourceVolumeMount, err := prepareDataSource(ctx, workspaceObj)
if err != nil {
return nil, err
}
volumes = append(volumes, dataSourceVolume)
volumeMounts = append(volumeMounts, dataSourceVolumeMount)
if initContainer.Name != "" {
initContainers = append(initContainers, *initContainer)
}

sidecarContainer, imagePushSecret, dataDestVolume, dataDestVolumeMount, err := prepareDataDestination(ctx, workspaceObj)
if err != nil {
return nil, err
}
volumes = append(volumes, dataDestVolume)
volumeMounts = append(volumeMounts, dataDestVolumeMount)
if sidecarContainer != nil {
sidecarContainers = append(sidecarContainers, *sidecarContainer)
}
if imagePushSecret != nil {
imagePullSecrets = append(imagePullSecrets, *imagePushSecret)
}

err = EnsureTuningConfigMap(ctx, workspaceObj, tuningObj, kubeClient)
if err != nil {
return nil, err
}

modelCommand, err := prepareModelRunParameters(ctx, tuningObj)
if err != nil {
return nil, err
Expand All @@ -141,7 +219,7 @@ func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspa
tuningImage := GetTuningImageInfo(ctx, workspaceObj, tuningObj)

jobObj := resources.GenerateTuningJobManifest(ctx, workspaceObj, tuningImage, imagePullSecrets, *workspaceObj.Resource.Count, commands,
containerPorts, nil, nil, resourceReq, tolerations, initContainers, volumes, volumeMounts)
containerPorts, nil, nil, resourceReq, tolerations, initContainers, sidecarContainers, volumes, volumeMounts)

err = resources.CreateResource(ctx, jobObj, kubeClient)
if client.IgnoreAlreadyExists(err) != nil {
Expand All @@ -150,47 +228,78 @@ func CreatePresetTuning(ctx context.Context, workspaceObj *kaitov1alpha1.Workspa
return jobObj, nil
}

// Now there are two options for data destination 1. HostPath - 2. Image
func prepareDataDestination(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) (*corev1.Container, *corev1.LocalObjectReference, corev1.Volume, corev1.VolumeMount, error) {
var sidecarContainer *corev1.Container
var volume corev1.Volume
var volumeMount corev1.VolumeMount
var imagePushSecret *corev1.LocalObjectReference
switch {
case workspaceObj.Tuning.Output.Image != "":
image, secret := workspaceObj.Tuning.Output.Image, workspaceObj.Tuning.Output.ImagePushSecret
imagePushSecret = &corev1.LocalObjectReference{Name: secret}
sidecarContainer, volume, volumeMount = handleImageDataDestination(ctx, image, secret)
// TODO: Future PR include
//case workspaceObj.Tuning.Output.Volume != nil:
}
return sidecarContainer, imagePushSecret, volume, volumeMount, nil
}

func handleImageDataDestination(ctx context.Context, image, imagePushSecret string) (*corev1.Container, corev1.Volume, corev1.VolumeMount) {
sidecarContainer := &corev1.Container{
Name: "docker-sidecar",
Image: "docker:dind",
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.BoolPtr(true),
},
Command: []string{"/bin/sh", "-c"},
Args: []string{dockerSidecarScriptPushImage(image)},
}

volume, volumeMount := utils.ConfigImagePushSecretVolume(imagePushSecret)
return sidecarContainer, volume, volumeMount
}

// Now there are three options for DataSource: 1. URL - 2. HostPath - 3. Image
func prepareDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace, kubeClient client.Client) ([]corev1.Container, []corev1.LocalObjectReference, []corev1.Volume, []corev1.VolumeMount, error) {
var initContainers []corev1.Container
var volumes []corev1.Volume
var volumeMounts []corev1.VolumeMount
func prepareDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) (*corev1.Container, []corev1.LocalObjectReference, corev1.Volume, corev1.VolumeMount, error) {
var initContainer *corev1.Container
var volume corev1.Volume
var volumeMount corev1.VolumeMount
var imagePullSecrets []corev1.LocalObjectReference
switch {
case workspaceObj.Tuning.Input.Image != "":
initContainers, volumes, volumeMounts = handleImageDataSource(ctx, workspaceObj)
_, imagePullSecrets = GetDataSrcImageInfo(ctx, workspaceObj)
var image string
image, imagePullSecrets = GetDataSrcImageInfo(ctx, workspaceObj)
initContainer, volume, volumeMount = handleImageDataSource(ctx, image)
case len(workspaceObj.Tuning.Input.URLs) > 0:
initContainers, volumes, volumeMounts = handleURLDataSource(ctx, workspaceObj)
initContainer, volume, volumeMount = handleURLDataSource(ctx, workspaceObj)
// TODO: Future PR include
// case workspaceObj.Tuning.Input.Volume != nil:
}
return initContainers, imagePullSecrets, volumes, volumeMounts, nil
return initContainer, imagePullSecrets, volume, volumeMount, nil
}

func handleImageDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) ([]corev1.Container, []corev1.Volume, []corev1.VolumeMount) {
var initContainers []corev1.Container
func handleImageDataSource(ctx context.Context, image string) (*corev1.Container, corev1.Volume, corev1.VolumeMount) {
// Constructing a multistep command that lists, copies, and then lists the destination
command := "ls -la /data && cp -r /data/* " + utils.DefaultDataVolumePath + " && ls -la " + utils.DefaultDataVolumePath
initContainers = append(initContainers, corev1.Container{
initContainer := &corev1.Container{
Name: "data-extractor",
Image: workspaceObj.Tuning.Input.Image,
Image: image,
Command: []string{"sh", "-c", command},
VolumeMounts: []corev1.VolumeMount{
{
Name: "data-volume",
MountPath: utils.DefaultDataVolumePath,
},
},
})
}

volumes, volumeMounts := utils.ConfigDataVolume("")
return initContainers, volumes, volumeMounts
volume, volumeMount := utils.ConfigDataVolume(nil)
return initContainer, volume, volumeMount
}

func handleURLDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) ([]corev1.Container, []corev1.Volume, []corev1.VolumeMount) {
var initContainers []corev1.Container
initContainers = append(initContainers, corev1.Container{
func handleURLDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Workspace) (*corev1.Container, corev1.Volume, corev1.VolumeMount) {
initContainer := &corev1.Container{
Name: "data-downloader",
Image: "curlimages/curl",
Command: []string{"sh", "-c", `
Expand All @@ -215,9 +324,9 @@ func handleURLDataSource(ctx context.Context, workspaceObj *kaitov1alpha1.Worksp
Value: utils.DefaultDataVolumePath,
},
},
})
volumes, volumeMounts := utils.ConfigDataVolume("")
return initContainers, volumes, volumeMounts
}
volume, volumeMount := utils.ConfigDataVolume(nil)
return initContainer, volume, volumeMount
}

func prepareModelRunParameters(ctx context.Context, tuningObj *model.PresetParam) (string, error) {
Expand All @@ -230,6 +339,9 @@ func prepareModelRunParameters(ctx context.Context, tuningObj *model.PresetParam
// and sets the GPU resources required for tuning.
// Returns the command and resource configuration.
func prepareTuningParameters(ctx context.Context, wObj *kaitov1alpha1.Workspace, modelCommand string, tuningObj *model.PresetParam) ([]string, corev1.ResourceRequirements) {
if tuningObj.TorchRunParams == nil {
tuningObj.TorchRunParams = make(map[string]string)
}
// Set # of processes to GPU Count
numProcesses := getInstanceGPUCount(wObj.Resource.InstanceType)
tuningObj.TorchRunParams["num_processes"] = fmt.Sprintf("%d", numProcesses)
Expand Down
Loading
Loading