Skip to content

Commit

Permalink
Update KFTO MNIST multi-node test script to add multi-gpu training sc…
Browse files Browse the repository at this point in the history
…enario using DDP example
  • Loading branch information
abhijeet-dhumal committed Jan 13, 2025
1 parent a439296 commit e7edc7b
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 221 deletions.
74 changes: 50 additions & 24 deletions tests/kfto/kfto_mnist_training_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,30 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPyTorchJobMnistMultiNodeCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt")
func TestPyTorchJobMnistMultiNodeSingleCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, 2, 1, "", GetCudaTrainingImage(), "resources/requirements.txt")
}

func TestPyTorchJobMnistMultiNodeWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
func TestPyTorchJobMnistMultiNodeMultiCpu(t *testing.T) {
runKFTOPyTorchMnistJob(t, 0, 2, 2, "", GetCudaTrainingImage(), "resources/requirements.txt")
}

func TestPyTorchJobMnistMultiNodeWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
func TestPyTorchJobMnistMultiNodeSingleGpuWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
}

func TestPyTorchJobMnistMultiNodeMultiGpuWithCuda(t *testing.T) {
runKFTOPyTorchMnistJob(t, 2, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
runKFTOPyTorchMnistJob(t, 2, 1, 2, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt")
}

func TestPyTorchJobMnistMultiNodeSingleGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 1, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
}

func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) {
runKFTOPyTorchMnistJob(t, 2, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
runKFTOPyTorchMnistJob(t, 2, 1, 2, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt")
}

func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) {
func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int, numProcPerNode int, gpuLabel string, image string, requirementsFile string) {
test := With(t)

// Create a namespace
Expand All @@ -59,7 +62,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa
mnist := ReadFile(test, "resources/mnist.py")
requirementsFileName := ReadFile(test, requirementsFile)

if numGpus > 0 {
if totalNumGpus > 0 {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1)
} else {
mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1)
Expand All @@ -74,7 +77,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa
defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{})

// Create training PyTorch job
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image)
tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, totalNumGpus, workerReplicas, numProcPerNode, outputPvc.Name, image)
defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0))

// Make sure the PyTorch job is running
Expand All @@ -87,11 +90,11 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa

}

func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, totalNumGpus int, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob {
var useGPU = false
var backend string

if numGpus > 0 {
if totalNumGpus > 0 {
useGPU = true
backend = "nccl"
} else {
Expand All @@ -108,13 +111,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
Spec: kftov1.PyTorchJobSpec{
PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{
"Master": {
kftov1.PyTorchJobReplicaTypeMaster: {
Replicas: Ptr(int32(1)),
RestartPolicy: kftov1.RestartPolicyOnFailure,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-mnist",
"app": "kfto-mnist",
"role": "master",
},
},
Spec: corev1.PodSpec{
Expand All @@ -141,7 +145,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
python /mnt/files/mnist.py --epochs 3 --save-model --output-path /mnt/output --backend %s`, backend),
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -159,7 +163,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
},
Expand Down Expand Up @@ -195,13 +199,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
},
},
"Worker": {
kftov1.PyTorchJobReplicaTypeWorker: {
Replicas: Ptr(int32(workerReplicas)),
RestartPolicy: kftov1.RestartPolicyOnFailure,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "kfto-mnist",
"app": "kfto-mnist",
"role": "worker",
},
},
Spec: corev1.PodSpec{
Expand All @@ -228,7 +233,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
python /mnt/files/mnist.py --epochs 3 --save-model --backend %s`, backend),
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
},
VolumeMounts: []corev1.VolumeMount{
{
Expand All @@ -242,7 +247,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)),
corev1.ResourceMemory: resource.MustParse("6Gi"),
},
},
Expand Down Expand Up @@ -276,20 +281,41 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config

if useGPU {
// Update resource lists for GPU (NVIDIA/ROCm) usecase
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus))
tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus))
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus))

tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Command = []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
}
tuningJob.Spec.PyTorchReplicaSpecs[kftov1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Command = []string{
"/bin/bash", "-c",
fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \
pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \
torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend),
}

tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
{
Name: "TORCH_DISTRIBUTED_DEBUG",
Value: "DETAIL",
},
}
tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "NCCL_DEBUG",
Value: "INFO",
},
{
Name: "TORCH_DISTRIBUTED_DEBUG",
Value: "DETAIL",
},
}

// Update tolerations
Expand Down
Loading

0 comments on commit e7edc7b

Please sign in to comment.