From e7edc7ba0a7eb96a58b8c80e75188da34a27cbcf Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Thu, 9 Jan 2025 22:54:02 +0530 Subject: [PATCH] Update KFTO MNIST multi-node test script to add multi-gpu training scenario using DDP example --- tests/kfto/kfto_mnist_training_test.go | 74 ++++-- tests/kfto/resources/mnist.py | 331 ++++++++++--------------- 2 files changed, 184 insertions(+), 221 deletions(-) diff --git a/tests/kfto/kfto_mnist_training_test.go b/tests/kfto/kfto_mnist_training_test.go index b1043e8f..cb017f53 100644 --- a/tests/kfto/kfto_mnist_training_test.go +++ b/tests/kfto/kfto_mnist_training_test.go @@ -30,27 +30,30 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestPyTorchJobMnistMultiNodeCpu(t *testing.T) { - runKFTOPyTorchMnistJob(t, 0, 2, "", GetCudaTrainingImage(), "resources/requirements.txt") +func TestPyTorchJobMnistMultiNodeSingleCpu(t *testing.T) { + runKFTOPyTorchMnistJob(t, 0, 2, 1, "", GetCudaTrainingImage(), "resources/requirements.txt") } - -func TestPyTorchJobMnistMultiNodeWithCuda(t *testing.T) { - runKFTOPyTorchMnistJob(t, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") +func TestPyTorchJobMnistMultiNodeMultiCpu(t *testing.T) { + runKFTOPyTorchMnistJob(t, 0, 2, 2, "", GetCudaTrainingImage(), "resources/requirements.txt") } -func TestPyTorchJobMnistMultiNodeWithROCm(t *testing.T) { - runKFTOPyTorchMnistJob(t, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") +func TestPyTorchJobMnistMultiNodeSingleGpuWithCuda(t *testing.T) { + runKFTOPyTorchMnistJob(t, 1, 1, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") } func TestPyTorchJobMnistMultiNodeMultiGpuWithCuda(t *testing.T) { - runKFTOPyTorchMnistJob(t, 2, 1, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") + runKFTOPyTorchMnistJob(t, 2, 1, 2, "nvidia.com/gpu", GetCudaTrainingImage(), "resources/requirements.txt") +} + +func TestPyTorchJobMnistMultiNodeSingleGpuWithROCm(t *testing.T) { + runKFTOPyTorchMnistJob(t, 1, 1, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") } func TestPyTorchJobMnistMultiNodeMultiGpuWithROCm(t *testing.T) { - runKFTOPyTorchMnistJob(t, 2, 1, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") + runKFTOPyTorchMnistJob(t, 2, 1, 2, "amd.com/gpu", GetROCmTrainingImage(), "resources/requirements-rocm.txt") } -func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLabel string, image string, requirementsFile string) { +func runKFTOPyTorchMnistJob(t *testing.T, totalNumGpus int, workerReplicas int, numProcPerNode int, gpuLabel string, image string, requirementsFile string) { test := With(t) // Create a namespace @@ -59,7 +62,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa mnist := ReadFile(test, "resources/mnist.py") requirementsFileName := ReadFile(test, requirementsFile) - if numGpus > 0 { + if totalNumGpus > 0 { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"gpu\""), 1) } else { mnist = bytes.Replace(mnist, []byte("accelerator=\"has to be specified\""), []byte("accelerator=\"cpu\""), 1) @@ -74,7 +77,7 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa defer test.Client().Core().CoreV1().PersistentVolumeClaims(namespace.Name).Delete(test.Ctx(), outputPvc.Name, metav1.DeleteOptions{}) // Create training PyTorch job - tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, numGpus, workerReplicas, outputPvc.Name, image) + tuningJob := createKFTOPyTorchMnistJob(test, namespace.Name, *config, gpuLabel, totalNumGpus, workerReplicas, numProcPerNode, outputPvc.Name, image) defer test.Client().Kubeflow().KubeflowV1().PyTorchJobs(namespace.Name).Delete(test.Ctx(), tuningJob.Name, *metav1.NewDeleteOptions(0)) // Make sure the PyTorch job is running @@ -87,11 +90,11 @@ func runKFTOPyTorchMnistJob(t *testing.T, numGpus int, workerReplicas int, gpuLa } -func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, numGpus int, workerReplicas int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { +func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.ConfigMap, gpuLabel string, totalNumGpus int, workerReplicas int, numProcPerNode int, outputPvcName string, baseImage string) *kftov1.PyTorchJob { var useGPU = false var backend string - if numGpus > 0 { + if totalNumGpus > 0 { useGPU = true backend = "nccl" } else { @@ -108,13 +111,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, Spec: kftov1.PyTorchJobSpec{ PyTorchReplicaSpecs: map[kftov1.ReplicaType]*kftov1.ReplicaSpec{ - "Master": { + kftov1.PyTorchJobReplicaTypeMaster: { Replicas: Ptr(int32(1)), RestartPolicy: kftov1.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "kfto-mnist", + "app": "kfto-mnist", + "role": "master", }, }, Spec: corev1.PodSpec{ @@ -141,7 +145,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config "/bin/bash", "-c", fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - python /mnt/files/mnist.py --epochs 3 --save-model --output-path /mnt/output --backend %s`, backend), + torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -159,7 +163,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)), corev1.ResourceMemory: resource.MustParse("6Gi"), }, }, @@ -195,13 +199,14 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, }, }, - "Worker": { + kftov1.PyTorchJobReplicaTypeWorker: { Replicas: Ptr(int32(workerReplicas)), RestartPolicy: kftov1.RestartPolicyOnFailure, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "kfto-mnist", + "app": "kfto-mnist", + "role": "worker", }, }, Spec: corev1.PodSpec{ @@ -228,7 +233,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config "/bin/bash", "-c", fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ - python /mnt/files/mnist.py --epochs 3 --save-model --backend %s`, backend), + torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 64 --lr 0.0005 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), }, VolumeMounts: []corev1.VolumeMount{ { @@ -242,7 +247,7 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", numProcPerNode)), corev1.ResourceMemory: resource.MustParse("6Gi"), }, }, @@ -276,20 +281,41 @@ func createKFTOPyTorchMnistJob(test Test, namespace string, config corev1.Config if useGPU { // Update resource lists for GPU (NVIDIA/ROCm) usecase - tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus)) - tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(numGpus)) + tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus)) + tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(gpuLabel)] = resource.MustParse(fmt.Sprint(totalNumGpus)) + + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.PyTorchJobReplicaTypeMaster].Template.Spec.Containers[0].Command = []string{ + "/bin/bash", "-c", + fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ + pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ + torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), + } + tuningJob.Spec.PyTorchReplicaSpecs[kftov1.MPIJobReplicaTypeWorker].Template.Spec.Containers[0].Command = []string{ + "/bin/bash", "-c", + fmt.Sprintf(`mkdir -p /tmp/lib && export PYTHONPATH=$PYTHONPATH:/tmp/lib && \ + pip install --no-cache-dir -r /mnt/files/requirements.txt --target=/tmp/lib && \ + torchrun --nproc_per_node=%d /mnt/files/mnist.py --epochs 7 --save_every 2 --batch_size 128 --lr 0.001 --snapshot_path "mnist_snapshot.pt" --backend %s`, numProcPerNode, backend), + } tuningJob.Spec.PyTorchReplicaSpecs["Master"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ { Name: "NCCL_DEBUG", Value: "INFO", }, + { + Name: "TORCH_DISTRIBUTED_DEBUG", + Value: "DETAIL", + }, } tuningJob.Spec.PyTorchReplicaSpecs["Worker"].Template.Spec.Containers[0].Env = []corev1.EnvVar{ { Name: "NCCL_DEBUG", Value: "INFO", }, + { + Name: "TORCH_DISTRIBUTED_DEBUG", + Value: "DETAIL", + }, } // Update tolerations diff --git a/tests/kfto/resources/mnist.py b/tests/kfto/resources/mnist.py index 91b1cbd3..a8c8677c 100644 --- a/tests/kfto/resources/mnist.py +++ b/tests/kfto/resources/mnist.py @@ -12,18 +12,35 @@ # See the License for the specific language governing permissions and # limitations under the License. -import argparse -import os - import torch -import torch.distributed as dist import torch.nn as nn import torch.nn.functional as F -import torch.optim as optim -from torch.utils.data import DistributedSampler -from torch.utils.tensorboard import SummaryWriter -from torchvision import datasets, transforms +import torch.distributed as dist + +from torch.utils.data import DataLoader, Dataset +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.utils.data.distributed import DistributedSampler +import torchvision +import torchvision.transforms as transforms +import os + +def ddp_setup(backend="nccl"): + """Setup for Distributed Data Parallel with specified backend.""" + # If CUDA is not available, use CPU as the fallback + if torch.cuda.is_available() and backend=="nccl": + # Check GPU availability + num_devices = torch.cuda.device_count() + device = int(os.environ.get("LOCAL_RANK", 0)) # Default to device 0 + if device >= num_devices: + print(f"Warning: Invalid device ordinal {device}. Defaulting to device 0.") + device = 0 + torch.cuda.set_device(device) + else: + # If no GPU is available, use Gloo backend (for CPU-only environments) + print("No GPU available, falling back to CPU.") + backend="gloo" + dist.init_process_group(backend=backend) class Net(nn.Module): def __init__(self): @@ -43,203 +60,123 @@ def forward(self, x): x = self.fc2(x) return F.log_softmax(x, dim=1) - -def train(args, model, device, train_loader, epoch, writer): - model.train() - optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum) - - for batch_idx, (data, target) in enumerate(train_loader): - # Attach tensors to the device. - data, target = data.to(device), target.to(device) - - optimizer.zero_grad() - output = model(data) - loss = F.nll_loss(output, target) +class Trainer: + def __init__( + self, + model: torch.nn.Module, + train_data: DataLoader, + optimizer: torch.optim.Optimizer, + save_every: int, + snapshot_path: str, + backend: str, + ) -> None: + self.local_rank = int(os.environ.get("LOCAL_RANK", -1)) # Ensure fallback if LOCAL_RANK isn't set + self.global_rank = int(os.environ["RANK"]) + + + self.model=model + self.train_data = train_data + self.optimizer = optimizer + self.save_every = save_every + self.epochs_run = 0 + self.snapshot_path = snapshot_path + self.backend = backend + + if os.path.exists(snapshot_path): + print("Loading snapshot") + self._load_snapshot(snapshot_path) + + # Move model to the appropriate device (GPU/CPU) + if torch.cuda.is_available() and self.backend=="nccl": + self.device = torch.device(f'cuda:{self.local_rank}') + self.model = DDP(self.model.to(self.device), device_ids=[self.local_rank]) + else: + self.device=torch.device('cpu') + self.model = DDP(self.model.to(self.device)) + print(f"Using device: {self.device}") + + def _run_batch(self, source, targets): + self.optimizer.zero_grad() + output = self.model(source) + loss = F.cross_entropy(output, targets) loss.backward() - optimizer.step() - if batch_idx % args.log_interval == 0: - print( - "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( - epoch, - batch_idx * len(data), - len(train_loader.dataset), - 100.0 * batch_idx / len(train_loader), - loss.item(), - ) - ) - niter = epoch * len(train_loader) + batch_idx - writer.add_scalar("loss", loss.item(), niter) - - -def test(model, device, test_loader, writer, epoch): - model.eval() - - correct = 0 - with torch.no_grad(): - for data, target in test_loader: - # Attach tensors to the device. - data, target = data.to(device), target.to(device) - - output = model(data) - # Get the index of the max log-probability. - pred = output.max(1, keepdim=True)[1] - correct += pred.eq(target.view_as(pred)).sum().item() - - print("\naccuracy={:.4f}\n".format(float(correct) / len(test_loader.dataset))) - writer.add_scalar("accuracy", float(correct) / len(test_loader.dataset), epoch) - - -def main(): - # Training settings - parser = argparse.ArgumentParser(description="PyTorch FashionMNIST Example") - parser.add_argument( - "--batch-size", - type=int, - default=64, - metavar="N", - help="input batch size for training (default: 64)", - ) - parser.add_argument( - "--test-batch-size", - type=int, - default=1000, - metavar="N", - help="input batch size for testing (default: 1000)", - ) - parser.add_argument( - "--epochs", - type=int, - default=1, - metavar="N", - help="number of epochs to train (default: 10)", - ) - parser.add_argument( - "--lr", - type=float, - default=0.01, - metavar="LR", - help="learning rate (default: 0.01)", - ) - parser.add_argument( - "--momentum", - type=float, - default=0.5, - metavar="M", - help="SGD momentum (default: 0.5)", - ) - parser.add_argument( - "--no-cuda", - action="store_true", - default=False, - help="disables CUDA training", - ) - parser.add_argument( - "--seed", - type=int, - default=1, - metavar="S", - help="random seed (default: 1)", - ) - parser.add_argument( - "--log-interval", - type=int, - default=10, - metavar="N", - help="how many batches to wait before logging training status", - ) - parser.add_argument( - "--save-model", - action="store_true", - default=False, - help="For Saving the current Model", - ) - parser.add_argument( - "--dir", - default="logs", - metavar="L", - help="directory where summary logs are stored", - ) + self.optimizer.step() + + def _run_epoch(self, epoch, backend): + b_sz = len(next(iter(self.train_data))[0]) + if torch.cuda.is_available() and backend=="nccl": + print(f"[GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") + else: + print(f"[CPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}") + if isinstance(self.train_data.sampler, DistributedSampler): + self.train_data.sampler.set_epoch(epoch) + for source, targets in self.train_data: + source = source.to(self.device) + targets = targets.to(self.device) + self._run_batch(source, targets) + + def _save_snapshot(self, epoch): + snapshot = { + "MODEL_STATE": self.model.module.state_dict() if torch.cuda.is_available() else self.model.state_dict(), + "EPOCHS_RUN": epoch, + } + torch.save(snapshot, self.snapshot_path) + print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}") + + def train(self, max_epochs: int, backend: str): + for epoch in range(self.epochs_run, max_epochs): + self._run_epoch(epoch, backend) + if self.global_rank == 0 and epoch % self.save_every == 0: + self._save_snapshot(epoch) + + +def load_train_objs(lr: float): + """Load dataset, model, and optimizer.""" + train_set = torchvision.datasets.MNIST("../data", + train=False, + download=True, + transform=transforms.Compose([transforms.ToTensor()])) + model = Net() + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + return train_set, model, optimizer - parser.add_argument( - "--backend", - type=str, - help="Distributed backend", - choices=[dist.Backend.GLOO, dist.Backend.NCCL, dist.Backend.MPI], - default=dist.Backend.GLOO, - ) - parser.add_argument( - "--output-path", - type=str, - default="./", - help="Path to save the trained model", +def prepare_dataloader(dataset: Dataset, batch_size: int, useGpu: bool): + """Prepare DataLoader with DistributedSampler.""" + return DataLoader( + dataset, + batch_size=batch_size, + pin_memory=useGpu, + shuffle=False, + sampler=DistributedSampler(dataset) ) - args = parser.parse_args() - use_cuda = not args.no_cuda and torch.cuda.is_available() - if use_cuda: - print("Using CUDA") - if args.backend != dist.Backend.NCCL: - print( - "Warning. Please use `nccl` distributed backend for the best performance using GPUs" - ) - - writer = SummaryWriter(args.dir) - - torch.manual_seed(args.seed) - device = torch.device("cuda" if use_cuda else "cpu") +def main(epochs: int, save_every: int, batch_size: int, lr: float, snapshot_path: str, backend: str): + ddp_setup(backend) + dataset, model, optimizer = load_train_objs(lr) + train_loader = prepare_dataloader(dataset, batch_size, torch.cuda.is_available() and backend=="nccl") + trainer = Trainer(model, train_loader, optimizer, save_every, snapshot_path, backend) + trainer.train(epochs, backend) + dist.destroy_process_group() - # Attach model to the device. - model = Net().to(device) - print("Using distributed PyTorch with {} backend".format(args.backend)) - # Set distributed training environment variables to run this training script locally. - if "WORLD_SIZE" not in os.environ: - os.environ["RANK"] = "0" - os.environ["WORLD_SIZE"] = "1" - os.environ["MASTER_ADDR"] = "localhost" - os.environ["MASTER_PORT"] = "1234" - - print(f"World Size: {os.environ['WORLD_SIZE']}. Rank: {os.environ['RANK']}") - - dist.init_process_group(backend=args.backend) - model = nn.parallel.DistributedDataParallel(model) +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="Distributed MNIST Training") + parser.add_argument('--epochs', type=int, required=True, help='Total epochs to train the model') + parser.add_argument('--save_every', type=int, required=True, help='How often to save a snapshot') + parser.add_argument('--batch_size', type=int, default=64, help='Input batch size on each device (default: 64)') + parser.add_argument('--lr', type=float, default=1e-3, help='Learning rate (default: 1e-3)') + parser.add_argument('--snapshot_path', type=str, default="snapshot_mnist.pt", help='Path to save snapshots (default: snapshot_mnist.pt)') + parser.add_argument('--backend', type=str, choices=['gloo', 'nccl'], default='nccl', help='Distributed backend type (default: nccl)') + args = parser.parse_args() - # Get FashionMNIST train and test dataset. - train_ds = datasets.FashionMNIST( - "../data", - train=True, - download=True, - transform=transforms.Compose([transforms.ToTensor()]), - ) - test_ds = datasets.FashionMNIST( - "../data", - train=False, - download=True, - transform=transforms.Compose([transforms.ToTensor()]), - ) - # Add train and test loaders. - train_loader = torch.utils.data.DataLoader( - train_ds, + main( + epochs=args.epochs, + save_every=args.save_every, batch_size=args.batch_size, - sampler=DistributedSampler(train_ds), - ) - test_loader = torch.utils.data.DataLoader( - test_ds, - batch_size=args.test_batch_size, - sampler=DistributedSampler(test_ds), + lr=args.lr, + snapshot_path=args.snapshot_path, + backend=args.backend ) - - for epoch in range(1, args.epochs + 1): - train(args, model, device, train_loader, epoch, writer) - test(model, device, test_loader, writer, epoch) - - if args.save_model and dist.get_rank() == 0: - model_path = os.path.join(args.output_path, "mnist_cnn.pt") - torch.save(model.state_dict(), model_path) - print(f"Model saved to {model_path}") - - -if __name__ == "__main__": - main()