diff --git a/.github/workflows/publish-benchmark-images.yaml b/.github/workflows/publish-benchmark-images.yaml index a5a1bcf..20fbede 100644 --- a/.github/workflows/publish-benchmark-images.yaml +++ b/.github/workflows/publish-benchmark-images.yaml @@ -21,6 +21,7 @@ jobs: - component: mpi-benchmarks - component: openfoam - component: perftest + - component: pytorch-benchmarks steps: - name: Check out the repository uses: actions/checkout@v2 diff --git a/README.md b/README.md index 77dcadf..6d40aba 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ - [RDMA Bandwidth](#rdma-bandwidth) - [RDMA Latency](#rdma-latency) - [fio](#fio) + - [PyTorch](#PyTorch) - [Operator development](#operator-development) ## Installation @@ -289,6 +290,41 @@ spec: storage: 5Gi ``` +### PyTorch + +Runs machine learning model training and inference micro-benchmarks from the official +PyTorch [benchmarks repo](https://github.com/pytorch/benchmark/) to compare performance +of CPU and GPU devices on synthetic input data. Running benchmarks on CUDA-capable +devices requires the [Nvidia GPU Operator](https://github.com/NVIDIA/gpu-operator) +to be pre-installed on the target Kubernetes cluster. + +The pre-built container image currently includes the `alexnet`, `resnet50` and +`llama` (inference only) models - additional models from the +[upstream repo list](https://github.com/pytorch/benchmark/tree/main/torchbenchmark/models) +may be added as needed in the future. (Adding a new model simply requires adding it to the list +in `images/pytorch-benchmark/Dockerfile` and updating the `PyTorchModel` enum in `pytorch.py`.) + +```yaml +apiVersion: perftest.stackhpc.com/v1alpha1 +kind: PyTorch +metadata: + name: pytorch-test-gpu +spec: + # The device to run the benchmark on ('cpu' or 'cuda') + device: cuda + # Name of model to benchmark + model: alexnet + # Either 'train' or 'eval' + # (not all models support both) + benchmarkType: eval + # Batch size for generated input data + inputBatchSize: 32 + resources: + limits: + nvidia.com/gpu: 2 +``` + + ## Operator development ``` diff --git a/images/pytorch-benchmarks/Dockerfile b/images/pytorch-benchmarks/Dockerfile new file mode 100644 index 0000000..7454a19 --- /dev/null +++ b/images/pytorch-benchmarks/Dockerfile @@ -0,0 +1,14 @@ +FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime + +RUN apt update && apt install -y git time +RUN git clone https://github.com/pytorch/benchmark +WORKDIR /workspace/benchmark +# Pin pytorch-benchmark repo version +RUN git reset --hard 6fef32ddaf93a63088b97eb27620fb57ef247521 +# List of models here should match PytorchModel enum +# in python/perftest/pytorch.py +RUN python install.py alexnet resnet50 llama + +# PyTorch install.py pins numpy=1.21.2 but +# this breaks numba so update both here +RUN pip install -U numpy numba \ No newline at end of file diff --git a/python/perftest/models/v1alpha1/fio.py b/python/perftest/models/v1alpha1/fio.py index 723ffe3..9980842 100644 --- a/python/perftest/models/v1alpha1/fio.py +++ b/python/perftest/models/v1alpha1/fio.py @@ -262,7 +262,7 @@ class Fio( ] ): """ - Custom resource for running an iperf benchmark. + Custom resource for running a fio benchmark. """ spec: FioSpec = Field( ..., diff --git a/python/perftest/models/v1alpha1/pytorch.py b/python/perftest/models/v1alpha1/pytorch.py new file mode 100644 index 0000000..13069f2 --- /dev/null +++ b/python/perftest/models/v1alpha1/pytorch.py @@ -0,0 +1,289 @@ +import re +import datetime as dt +import typing as t + +from pydantic import Field, constr, conint + +from kube_custom_resource import schema + +from ...config import settings +from ...errors import PodLogFormatError, PodResultsIncompleteError +from ...utils import GnuTimeResult + +from . import base + +# If results output format changes in future pytorch-benchmark versions +# check https://github.com/pytorch/benchmark/blob/main/run.py for changes +PYTORCH_CPU_TIME_REGEX = re.compile(r"CPU Wall Time per batch:\s+(?P\d+\.\d+)\s*(?P\w+)") +PYTORCH_CPU_MEMORY_REGEX = re.compile(r"CPU Peak Memory:\s+(?P\d+\.\d+)\s*(?P\w+)") +PYTORCH_GPU_TIME_REGEX = re.compile(r"GPU Time per batch:\s+(?P\d+\.\d+)\s*(?P\w+)") +PYTORCH_GPU_MEMORY_REGEX = re.compile(r"GPU \d+ Peak Memory:\s+(?P\d+\.\d+)\s*(?P\w+)") + + +class Device(str, schema.Enum): + """ + Enumeration of supported computation devices. + """ + CPU = "cpu" + CUDA = "cuda" + +# List of models here should match list in images/pytorch-benchmark/Dockerfile +class PyTorchModel(str, schema.Enum): + """ + Enumeration of available models for benchmarking. + """ + ALEXNET = "alexnet" + RESNET50 = "resnet50" + LLAMA = "llama" + +class PyTorchBenchmarkType(str, schema.Enum): + """ + Enumeration of model processes available to benchmark. + """ + TRAIN = "train" + EVAL = "eval" + + +class PyTorchSpec(base.BenchmarkSpec): + """ + Defines the parameters for the Fio benchmark. + """ + image: constr(min_length = 1) = Field( + f"{settings.default_image_prefix}pytorch-benchmarks:{settings.default_image_tag}", + description = "The image to use for the benchmark." + ) + image_pull_policy: base.ImagePullPolicy = Field( + base.ImagePullPolicy.IF_NOT_PRESENT, + description = "The pull policy for the image." + ) + # PyTorch benchmark config options + device: Device = Field( + Device.CPU, + description = ( + "The device to run the ML workload." + "If device is 'cuda' then you must also make a request for GPU resources by" + "adding a 'nvidia.com/gpu: ' field to benchmark.spec.resources.limits" + ) + ) + model: PyTorchModel = Field( + description = "The ML model to benchmark." + ) + benchmark_type: PyTorchBenchmarkType = Field( + PyTorchBenchmarkType.EVAL, + description = "Whether to benchmark the training or inference (eval) process." + ) + input_batch_size: conint(multiple_of=2, ge=2) = Field( + 64, + description = "The batch size for the generated model input data.", + ) + + +class PyTorchResult(schema.BaseModel): + """ + Represents an individual PyTorch benchmark result. + + Some notes on the inner workings of the pytorch benchmark script: + - Currently only runs one batch for benchmark so 'time per batch' in pytorch output == total time. + (This may change in future since 'per batch' suffix was added to output text very recently.) + https://github.com/pytorch/benchmark/blob/6fef32ddaf93a63088b97eb27620fb57ef247521/run.py#L468 + - CPU 'wall time' reported by pytorch is significantly shorter than reported by GNU `time` command. + It's not clear what is taking up this extra time outwith the actual model invocation (downloading + model weights and generating random in-memory input data shouldn't take long at all). + """ + pytorch_time: schema.confloat(ge = 0) = Field( + ..., + description = "The CPU wall time (in seconds) as reported by the pytorch benchmark script." + ) + peak_cpu_memory: schema.confloat(ge = 0) = Field( + ..., + description = "The peak CPU memory usage (in GB) reported by the pytorch benchmark script." + ) + gpu_time: t.Optional[schema.confloat(ge = 0)] = Field( + None, # Default to zero for clearer reporting on cpu-only runs + description = "The GPU wall time (in seconds) reported by the pytorch benchmark script." + ) + peak_gpu_memory: t.Optional[schema.confloat(ge = 0)] = Field( + None, # Default to zero for clearer reporting on cpu-only runs + description = "The peak GPU memory usage (in GB) reported by the pytorch benchmark script." + ) + gnu_time: GnuTimeResult = Field( + description = "A container for the output of the `time` command which wraps the benchmark execution script." + ) + + +class PyTorchStatus(base.BenchmarkStatus): + """ + Represents the status of the PyTorch benchmark. + """ + gpu_count: conint(ge=0) = Field( + None, + description = "The number of gpus used in this benchmark" + ) + result: t.Optional[PyTorchResult] = Field( + None, + description = "The result of the benchmark." + ) + wall_time_result: schema.confloat(ge = 0) = Field( + None, + description = ( + "The wall time (in seconds) reported by the GNU time wrapper." + "Used as a headline result." + ) + ) + gpu_time_result: schema.confloat(ge = 0) = Field( + None, + description = ( + "The GPU wall time (in seconds) reported by the pytorch benchmark script." + "Used as a headline result." + ) + ) + worker_pod: t.Optional[base.PodInfo] = Field( + None, + description = "Pod information for the pod running the benchmark." + ) + client_log: t.Optional[constr(min_length = 1)] = Field( + None, + description = "The raw pod log of the client pod." + ) + + +class PyTorch( + base.Benchmark, + subresources = {"status": {}}, + printer_columns = [ + { + "name": "Model", + "type": "string", + "jsonPath": ".spec.model", + }, + { + "name": "Benchmark Type", + "type": "string", + "jsonPath": ".spec.benchmarkType", + }, + { + "name": "Device", + "type": "string", + "jsonPath": ".spec.device", + }, + { + "name": "GPUs", + "type": "integer", + "jsonPath": ".status.gpuCount", + }, + { + "name": "Batch Size", + "type": "integer", + "jsonPath": ".spec.inputBatchSize", + }, + { + "name": "Status", + "type": "string", + "jsonPath": ".status.phase", + }, + { + "name": "Started", + "type": "date", + "jsonPath": ".status.startedAt", + }, + { + "name": "Finished", + "type": "date", + "jsonPath": ".status.finishedAt", + }, + { + "name": "Wall Time (s)", + "type": "number", + "jsonPath": ".status.wallTimeResult", + }, + { + "name": "GPU Time (s)", + "type": "number", + "jsonPath": ".status.gpuTimeResult", + }, + ] +): + """ + Custom resource for running an PyTorch benchmark. + """ + spec: PyTorchSpec = Field( + ..., + description = "The parameters for the benchmark." + ) + status: PyTorchStatus = Field( + default_factory = PyTorchStatus, + description = "The status of the benchmark." + ) + + async def pod_modified( + self, + pod: t.Dict[str, t.Any], + fetch_pod_log: t.Callable[[], t.Awaitable[str]] + ): + # Parse GPU count from resources to display in status + if self.spec.resources: + if self.spec.resources.limits: + if 'nvidia.com/gpu' in self.spec.resources.limits.keys(): + self.status.gpu_count = self.spec.resources.limits['nvidia.com/gpu'] + else: + self.status.gpu_count = 0 + + pod_phase = pod.get("status", {}).get("phase", "Unknown") + if pod_phase == "Running": + self.status.worker_pod = base.PodInfo.from_pod(pod) + elif pod_phase == "Succeeded": + self.status.client_log = await fetch_pod_log() + + def summarise(self): + # If the client log has not yet been recorded, bail + if not self.status.client_log: + raise PodResultsIncompleteError("Pod has not recorded a result yet") + + # Parse job output here + cpu_time_match = PYTORCH_CPU_TIME_REGEX.search(self.status.client_log) + cpu_time = cpu_time_match.group('cpu_time') + cpu_time_units = cpu_time_match.group('cpu_time_units') + cpu_memory_match = PYTORCH_CPU_MEMORY_REGEX.search(self.status.client_log) + cpu_peak_memory = cpu_memory_match.group('cpu_memory') + cpu_peak_memory_units = cpu_memory_match.group('cpu_mem_units') + + if cpu_time_units != "milliseconds" or cpu_peak_memory_units != "GB": + raise PodLogFormatError( + "results output in unexpected units - expected 'milliseconds' and 'GB'" + "(it's possible that results formatting has changed in upstream pytorch-benchmarks)" + ) + + if self.spec.device != "cpu": + # Parse GPU results + gpu_time_match = PYTORCH_GPU_TIME_REGEX.search(self.status.client_log) + gpu_time = gpu_time_match.group('gpu_time') + gpu_time_units = gpu_time_match.group('gpu_time_units') + gpu_memory_match = PYTORCH_GPU_MEMORY_REGEX.search(self.status.client_log) + gpu_peak_memory = gpu_memory_match.group('gpu_memory') + gpu_peak_memory_units = gpu_memory_match.group('gpu_mem_units') + if gpu_time_units != "milliseconds" or gpu_peak_memory_units != "GB": + raise PodLogFormatError( + "results output in unexpected units - expected 'milliseconds' and 'GB'" + "(it's possible that results formatting has changed in upstream pytorch-benchmarks)" + ) + # Convert times to seconds to match GNU time output + gpu_time = float(gpu_time) / 1000 + else: + gpu_time, gpu_peak_memory = None, None + + # Parse the GNU time wrapper output + gnu_time_result = GnuTimeResult.parse(self.status.client_log) + + # Convert times to seconds to match GNU time output + self.status.result = PyTorchResult( + pytorch_time = float(cpu_time) / 1000, + peak_cpu_memory = cpu_peak_memory, + gpu_time = gpu_time, + peak_gpu_memory = gpu_peak_memory, + gnu_time = gnu_time_result, + ) + + # Format results nicely for printing + self.status.wall_time_result = float(f"{self.status.result.gnu_time.wall_time_secs:.3g}") + if self.status.result.gpu_time: + self.status.gpu_time_result = float(f"{self.status.result.gpu_time:.3g}") diff --git a/python/perftest/templates/pytorch.yaml.j2 b/python/perftest/templates/pytorch.yaml.j2 new file mode 100644 index 0000000..b75eb32 --- /dev/null +++ b/python/perftest/templates/pytorch.yaml.j2 @@ -0,0 +1,40 @@ +{% import '_macros.j2' as macros %} +--- +{% call macros.job(benchmark) -%} +tasks: + - name: pytorch-worker + replicas: 1 + policies: + - event: PodFailed + action: RestartJob + - event: PodEvicted + action: RestartJob + template: + metadata: + labels: + {{ macros.labels(benchmark) | indent(10) }} + spec: + restartPolicy: Never + containers: + - name: pytorch-benchmark + image: {{ benchmark.spec.image }} + imagePullPolicy: {{ benchmark.spec.image_pull_policy }} + command: ["time"] + args: + - -v + - python3 + - run.py + - "{{ benchmark.spec.model }}" + - -t + - "{{ benchmark.spec.benchmark_type }}" + - -d + - "{{ benchmark.spec.device }}" + - --bs + - "{{ benchmark.spec.input_batch_size}}" + {%- if benchmark.spec.resources %} + resources: + {{ benchmark.spec.resources | toyaml | indent(12) }} + {%- endif %} + # Avoid pods from other benchmarks + {{ macros.distribution_spread(benchmark) | indent(8) }} +{%- endcall %} \ No newline at end of file diff --git a/python/perftest/utils.py b/python/perftest/utils.py index 9f3ea2b..898a518 100644 --- a/python/perftest/utils.py +++ b/python/perftest/utils.py @@ -1,6 +1,10 @@ import functools import math +import re import typing as t +from kube_custom_resource import schema +from pydantic import Field, confloat +from .errors import PodLogFormatError def mergeconcat( @@ -71,3 +75,75 @@ def format_amount( formatted_amount = f"{integer_part}.{fractional_part}" prefix_index = prefixes.index(original_prefix) + exponent return (formatted_amount, prefixes[prefix_index]) + + +GNU_TIME_EXTRACTION_REGEX = re.compile( + r"\s*Command being timed:\s+\"(?P.+)\"" + r"\s+User time \(seconds\):\s+(?P\d+\.\d+)" + r"\s+System time \(seconds\):\s+(?P\d+\.\d+)" + r"\s+Percent of CPU this job got:\s+(?P\d+)\%" + r"\s+Elapsed \(wall clock\) time \(h:mm:ss or m:ss\):\s+(?P\d*:*\d+:\d+\.\d+)" +) + +class GnuTimeResult(schema.BaseModel): + """ + Helper class for parsing the output of the gnu time wrapper. Example output + from (verbose mode) `/usr/bin/time -v`: + Command being timed: "sleep 2" + User time (seconds): 0.00 + System time (seconds): 0.00 + Percent of CPU this job got: 0% + Elapsed (wall clock) time (h:mm:ss or m:ss): 0:02.00 + Average shared text size (kbytes): 0 + Average unshared data size (kbytes): 0 + Average stack size (kbytes): 0 + Average total size (kbytes): 0 + Maximum resident set size (kbytes): 1612 + Average resident set size (kbytes): 0 + Major (requiring I/O) page faults: 0 + Minor (reclaiming a frame) page faults: 67 + Voluntary context switches: 2 + Involuntary context switches: 0 + Swaps: 0 + File system inputs: 0 + File system outputs: 0 + Socket messages sent: 0 + Socket messages received: 0 + Signals delivered: 0 + Page size (bytes): 4096 + Exit status: 0 + """ + # Add other fields here as needed + command: str = Field(description="The command being timed.") + user_time_secs: confloat(ge=0) = Field(description="The time spent executing user space code.") + sys_time_secs: confloat(ge=0) = Field(description="The time spent executing system (kernel space) code.") + cpu_percentage: confloat(ge=0) = Field(description="The (peak) percentage of CPU used.") + wall_time_secs: confloat(ge=0) = Field(description="The wall clock time for this benchmark run.") + + @classmethod + def parse(cls, input: str): + match = GNU_TIME_EXTRACTION_REGEX.search(input) + if not match: + raise PodLogFormatError("failed to parse output of GNU time command") + + # Convert wall time to seconds for consistency with other fields + # Default format is either 'hh:mm:ss.ss' or 'mm:ss.ss' depending on value + wall_time = match.group("wall_time") + try: + hrs_mins_secs, frac_secs = wall_time.split(".") + parts = hrs_mins_secs.split(":") + if len(parts) == 2: + hrs, mins, secs = 0, *parts + elif len(parts) == 3: + hrs, mins, secs = parts + wall_time_secs = float(hrs)*3600 + float(mins)*60 + float(secs) + float("0."+frac_secs) + except: + raise PodLogFormatError("failed to parse GNU wall time in format hh:mm:ss.ss or mm:ss.ss") + + return GnuTimeResult( + command = match.group("command"), + user_time_secs = match.group("user_time"), + sys_time_secs = match.group("sys_time"), + cpu_percentage = match.group("cpu_percentage"), + wall_time_secs = wall_time_secs, + )