From 88577c7253861592bb005d1b1e817cd9561bc16b Mon Sep 17 00:00:00 2001 From: abhijeet-dhumal Date: Thu, 18 Jul 2024 16:10:20 +0530 Subject: [PATCH] Updated e2e tests to support S3 compatible storage bucket from whicyh to download MNISt datasets for disconnected automation --- .pre-commit-config.yaml | 1 + docs/e2e.md | 27 ++- tests/e2e/local_interactive_sdk_oauth_test.py | 3 + tests/e2e/minio_deployment.yaml | 163 ++++++++++++++++++ tests/e2e/mnist.py | 91 +++++++++- tests/e2e/mnist_pip_requirements.txt | 1 + .../e2e/mnist_raycluster_sdk_aw_kind_test.py | 3 +- tests/e2e/mnist_raycluster_sdk_kind_test.py | 2 +- tests/e2e/mnist_raycluster_sdk_oauth_test.py | 8 +- tests/e2e/support.py | 50 +++++- tests/upgrade/raycluster_sdk_upgrade_test.py | 1 + 11 files changed, 331 insertions(+), 19 deletions(-) create mode 100644 tests/e2e/minio_deployment.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 89e037cd6..7928084dc 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,6 +7,7 @@ repos: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml + args: [--allow-multiple-documents] - id: check-added-large-files - repo: https://github.com/psf/black rev: 23.3.0 diff --git a/docs/e2e.md b/docs/e2e.md index 039749d4e..f3d0c9409 100644 --- a/docs/e2e.md +++ b/docs/e2e.md @@ -108,8 +108,25 @@ Currently the SDK doesn't support tolerations, so e2e tests can't be executed on ``` poetry run pytest -v -s ./tests/e2e -m openshift --timeout=1200 ``` - - If the cluster doesn't have NVidia GPU support or GPU nodes have taint then we need to disable NVidia GPU tests by providing proper marker: - ``` - poetry install --with test,docs - poetry run pytest -v -s ./tests/e2e/mnist_raycluster_sdk_kind_test.py -m 'not nvidia_gpu' - ``` + +## On OpenShift Disconnected clusters + +- In addition to setup phase mentioned above in case of Openshift cluster, Disconnected environment requires following pre-requisites : + - Mirror Image registry : + - Image mirror registry is used to host set of container images required locally for the applications and services. This ensures to pull images without needing an external network connection. It also ensures continuous operation and deployment capabilities in a network-isolated environment. + - PYPI Mirror Index : + - When trying to install Python packages in a disconnected environment, the pip command might fail because the connection cannot install packages from external URLs. This issue can be resolved by setting up PIP Mirror Index on separate endpoint in same environment. + - S3 compatible storage : + - Some of our distributed training examples require an external storage solution so that all nodes can access the same data in disconnected environment (For example: common-datasets and model files). + - Minio S3 compatible storage type instance can be deployed in disconnected environment using `/tests/e2e/minio_deployment.yaml` or using support methods in e2e test suite. + - The following are environment variables for configuring PIP index URl for accessing the common-python packages required and the S3 or Minio storage for your Ray Train script or interactive session. + ``` + export RAY_IMAGE=quay.io/project-codeflare/ray@sha256: (prefer image digest over image tag in disocnnected environment) + PIP_INDEX_URL=https:///root/pypi/+simple/ \ + PIP_TRUSTED_HOST= \ + AWS_DEFAULT_ENDPOINT= \ + AWS_ACCESS_KEY_ID= \ + AWS_SECRET_ACCESS_KEY= \ + AWS_STORAGE_BUCKET= + AWS_STORAGE_BUCKET_MNIST_DIR= + ``` diff --git a/tests/e2e/local_interactive_sdk_oauth_test.py b/tests/e2e/local_interactive_sdk_oauth_test.py index 0e5c0204c..b5229deb8 100644 --- a/tests/e2e/local_interactive_sdk_oauth_test.py +++ b/tests/e2e/local_interactive_sdk_oauth_test.py @@ -28,6 +28,8 @@ def test_local_interactives(self): self.run_local_interactives() def run_local_interactives(self): + ray_image = get_ray_image() + auth = TokenAuthentication( token=run_oc_command(["whoami", "--show-token=true"]), server=run_oc_command(["whoami", "--show-server=true"]), @@ -46,6 +48,7 @@ def run_local_interactives(self): worker_cpu_limits=1, worker_memory_requests=1, worker_memory_limits=4, + image=ray_image, verify_tls=False, ) ) diff --git a/tests/e2e/minio_deployment.yaml b/tests/e2e/minio_deployment.yaml new file mode 100644 index 000000000..86d4ef01f --- /dev/null +++ b/tests/e2e/minio_deployment.yaml @@ -0,0 +1,163 @@ +--- +kind: PersistentVolumeClaim +apiVersion: v1 +metadata: + name: minio-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 20Gi + volumeMode: Filesystem +--- +kind: Secret +apiVersion: v1 +metadata: + name: minio-secret +stringData: + # change the username and password to your own values. + # ensure that the user is at least 3 characters long and the password at least 8 + minio_root_user: minio + minio_root_password: minio123 +--- +kind: Deployment +apiVersion: apps/v1 +metadata: + name: minio +spec: + replicas: 1 + selector: + matchLabels: + app: minio + template: + metadata: + creationTimestamp: null + labels: + app: minio + spec: + volumes: + - name: data + persistentVolumeClaim: + claimName: minio-pvc + containers: + - resources: + limits: + cpu: 250m + memory: 1Gi + requests: + cpu: 20m + memory: 100Mi + readinessProbe: + tcpSocket: + port: 9000 + initialDelaySeconds: 5 + timeoutSeconds: 1 + periodSeconds: 5 + successThreshold: 1 + failureThreshold: 3 + terminationMessagePath: /dev/termination-log + name: minio + livenessProbe: + tcpSocket: + port: 9000 + initialDelaySeconds: 30 + timeoutSeconds: 1 + periodSeconds: 5 + successThreshold: 1 + failureThreshold: 3 + env: + - name: MINIO_ROOT_USER + valueFrom: + secretKeyRef: + name: minio-secret + key: minio_root_user + - name: MINIO_ROOT_PASSWORD + valueFrom: + secretKeyRef: + name: minio-secret + key: minio_root_password + ports: + - containerPort: 9000 + protocol: TCP + - containerPort: 9090 + protocol: TCP + imagePullPolicy: IfNotPresent + volumeMounts: + - name: data + mountPath: /data + subPath: minio + terminationMessagePolicy: File + image: >- + quay.io/minio/minio:RELEASE.2024-06-22T05-26-45Z + # In case of disconnected environment, use image digest instead of tag + # For example : /minio/minio@sha256:6b3abf2f59286b985bfde2b23e37230b466081eda5dccbf971524d54c8e406b5 + args: + - server + - /data + - --console-address + - :9090 + restartPolicy: Always + terminationGracePeriodSeconds: 30 + dnsPolicy: ClusterFirst + securityContext: {} + schedulerName: default-scheduler + strategy: + type: Recreate + revisionHistoryLimit: 10 + progressDeadlineSeconds: 600 +--- +kind: Service +apiVersion: v1 +metadata: + name: minio-service +spec: + ipFamilies: + - IPv4 + ports: + - name: api + protocol: TCP + port: 9000 + targetPort: 9000 + - name: ui + protocol: TCP + port: 9090 + targetPort: 9090 + internalTrafficPolicy: Cluster + type: ClusterIP + ipFamilyPolicy: SingleStack + sessionAffinity: None + selector: + app: minio +--- +kind: Route +apiVersion: route.openshift.io/v1 +metadata: + name: minio-api +spec: + to: + kind: Service + name: minio-service + weight: 100 + port: + targetPort: api + wildcardPolicy: None + tls: + termination: edge + insecureEdgeTerminationPolicy: Redirect +--- +kind: Route +apiVersion: route.openshift.io/v1 +metadata: + name: minio-ui +spec: + to: + kind: Service + name: minio-service + weight: 100 + port: + targetPort: ui + wildcardPolicy: None + tls: + termination: edge + insecureEdgeTerminationPolicy: Redirect diff --git a/tests/e2e/mnist.py b/tests/e2e/mnist.py index 55ed91eaa..4c382f67b 100644 --- a/tests/e2e/mnist.py +++ b/tests/e2e/mnist.py @@ -15,6 +15,7 @@ import os import torch +import requests from pytorch_lightning import LightningModule, Trainer from pytorch_lightning.callbacks.progress import TQDMProgressBar from torch import nn @@ -23,9 +24,15 @@ from torchmetrics import Accuracy from torchvision import transforms from torchvision.datasets import MNIST +import gzip +import shutil +from minio import Minio + PATH_DATASETS = os.environ.get("PATH_DATASETS", ".") BATCH_SIZE = 256 if torch.cuda.is_available() else 64 + +local_mnist_path = os.path.dirname(os.path.abspath(__file__)) # %% print("prior to running the trainer") @@ -35,6 +42,25 @@ print("ACCELERATOR: is ", os.getenv("ACCELERATOR")) ACCELERATOR = os.getenv("ACCELERATOR") +STORAGE_BUCKET_EXISTS = "AWS_DEFAULT_ENDPOINT" in os.environ +print("STORAGE_BUCKET_EXISTS: ", STORAGE_BUCKET_EXISTS) + +print( + f'Storage_Bucket_Default_Endpoint : is {os.environ.get("AWS_DEFAULT_ENDPOINT")}' + if "AWS_DEFAULT_ENDPOINT" in os.environ + else "" +) +print( + f'Storage_Bucket_Name : is {os.environ.get("AWS_STORAGE_BUCKET")}' + if "AWS_STORAGE_BUCKET" in os.environ + else "" +) +print( + f'Storage_Bucket_Mnist_Directory : is {os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR")}' + if "AWS_STORAGE_BUCKET_MNIST_DIR" in os.environ + else "" +) + class LitMNIST(LightningModule): def __init__(self, data_dir=PATH_DATASETS, hidden_size=64, learning_rate=2e-4): @@ -114,19 +140,74 @@ def configure_optimizers(self): def prepare_data(self): # download print("Downloading MNIST dataset...") - MNIST(self.data_dir, train=True, download=True) - MNIST(self.data_dir, train=False, download=True) + + if ( + STORAGE_BUCKET_EXISTS + and os.environ.get("AWS_DEFAULT_ENDPOINT") != "" + and os.environ.get("AWS_DEFAULT_ENDPOINT") != None + ): + print("Using storage bucket to download datasets...") + + dataset_dir = os.path.join(self.data_dir, "MNIST/raw") + endpoint = os.environ.get("AWS_DEFAULT_ENDPOINT") + access_key = os.environ.get("AWS_ACCESS_KEY_ID") + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY") + bucket_name = os.environ.get("AWS_STORAGE_BUCKET") + + client = Minio( + endpoint, + access_key=access_key, + secret_key=secret_key, + cert_check=False, + ) + + if not os.path.exists(dataset_dir): + os.makedirs(dataset_dir) + else: + print(f"Directory '{dataset_dir}' already exists") + + # To download datasets from storage bucket's specific directory, use prefix to provide directory name + prefix = os.environ.get("AWS_STORAGE_BUCKET_MNIST_DIR") + # download all files from prefix folder of storage bucket recursively + for item in client.list_objects(bucket_name, prefix=prefix, recursive=True): + file_name = item.object_name[len(prefix) + 1 :] + dataset_file_path = os.path.join(dataset_dir, file_name) + if not os.path.exists(dataset_file_path): + client.fget_object(bucket_name, item.object_name, dataset_file_path) + else: + print(f"File-path '{dataset_file_path}' already exists") + # Unzip files + with gzip.open(dataset_file_path, "rb") as f_in: + with open(dataset_file_path.split(".")[:-1][0], "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + # delete zip file + os.remove(dataset_file_path) + unzipped_filepath = dataset_file_path.split(".")[0] + if os.path.exists(unzipped_filepath): + print( + f"Unzipped and saved dataset file to path - {unzipped_filepath}" + ) + download_datasets = False + + else: + print("Using default MNIST mirror reference to download datasets...") + download_datasets = True + + MNIST(self.data_dir, train=True, download=download_datasets) + MNIST(self.data_dir, train=False, download=download_datasets) def setup(self, stage=None): # Assign train/val datasets for use in dataloaders if stage == "fit" or stage is None: - mnist_full = MNIST(self.data_dir, train=True, transform=self.transform) + mnist_full = MNIST( + self.data_dir, train=True, transform=self.transform, download=False + ) self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000]) # Assign test dataset for use in dataloader(s) if stage == "test" or stage is None: self.mnist_test = MNIST( - self.data_dir, train=False, transform=self.transform + self.data_dir, train=False, transform=self.transform, download=False ) def train_dataloader(self): @@ -145,7 +226,7 @@ def test_dataloader(self): # Init DataLoader from MNIST Dataset -model = LitMNIST() +model = LitMNIST(data_dir=local_mnist_path) print("GROUP: ", int(os.environ.get("GROUP_WORLD_SIZE", 1))) print("LOCAL: ", int(os.environ.get("LOCAL_WORLD_SIZE", 1))) diff --git a/tests/e2e/mnist_pip_requirements.txt b/tests/e2e/mnist_pip_requirements.txt index 4c9d5fcb8..907e9a52d 100644 --- a/tests/e2e/mnist_pip_requirements.txt +++ b/tests/e2e/mnist_pip_requirements.txt @@ -1,3 +1,4 @@ pytorch_lightning==1.9.5 torchmetrics==0.9.1 torchvision==0.12.0 +minio diff --git a/tests/e2e/mnist_raycluster_sdk_aw_kind_test.py b/tests/e2e/mnist_raycluster_sdk_aw_kind_test.py index d2d0995b6..012098a40 100644 --- a/tests/e2e/mnist_raycluster_sdk_aw_kind_test.py +++ b/tests/e2e/mnist_raycluster_sdk_aw_kind_test.py @@ -19,6 +19,7 @@ def setup_method(self): def teardown_method(self): delete_namespace(self) + delete_kueue_resources(self) def test_mnist_ray_cluster_sdk_kind(self): self.setup_method() @@ -77,7 +78,7 @@ def assert_jobsubmit_withoutlogin_kind(self, cluster, accelerator, number_of_gpu runtime_env={ "working_dir": "./tests/e2e/", "pip": "./tests/e2e/mnist_pip_requirements.txt", - "env_vars": {"ACCELERATOR": accelerator}, + "env_vars": get_setup_env_variables(ACCELERATOR=accelerator), }, entrypoint_num_gpus=number_of_gpus, ) diff --git a/tests/e2e/mnist_raycluster_sdk_kind_test.py b/tests/e2e/mnist_raycluster_sdk_kind_test.py index 2635fecd8..2623b36c4 100644 --- a/tests/e2e/mnist_raycluster_sdk_kind_test.py +++ b/tests/e2e/mnist_raycluster_sdk_kind_test.py @@ -77,7 +77,7 @@ def assert_jobsubmit_withoutlogin_kind(self, cluster, accelerator, number_of_gpu runtime_env={ "working_dir": "./tests/e2e/", "pip": "./tests/e2e/mnist_pip_requirements.txt", - "env_vars": {"ACCELERATOR": accelerator}, + "env_vars": get_setup_env_variables(ACCELERATOR=accelerator), }, entrypoint_num_gpus=number_of_gpus, ) diff --git a/tests/e2e/mnist_raycluster_sdk_oauth_test.py b/tests/e2e/mnist_raycluster_sdk_oauth_test.py index 0ddab720b..bc052bea1 100644 --- a/tests/e2e/mnist_raycluster_sdk_oauth_test.py +++ b/tests/e2e/mnist_raycluster_sdk_oauth_test.py @@ -28,6 +28,8 @@ def test_mnist_ray_cluster_sdk_auth(self): self.run_mnist_raycluster_sdk_oauth() def run_mnist_raycluster_sdk_oauth(self): + ray_image = get_ray_image() + auth = TokenAuthentication( token=run_oc_command(["whoami", "--show-token=true"]), server=run_oc_command(["whoami", "--show-server=true"]), @@ -42,10 +44,11 @@ def run_mnist_raycluster_sdk_oauth(self): num_workers=1, head_cpus="500m", head_memory=2, - worker_cpu_requests="500m", + worker_cpu_requests=1, worker_cpu_limits=1, worker_memory_requests=1, worker_memory_limits=4, + image=ray_image, write_to_file=True, verify_tls=False, ) @@ -73,6 +76,7 @@ def assert_jobsubmit_withoutLogin(self, cluster): "runtime_env": { "working_dir": "./tests/e2e/", "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": get_setup_env_variables(), }, } try: @@ -100,7 +104,9 @@ def assert_jobsubmit_withlogin(self, cluster): runtime_env={ "working_dir": "./tests/e2e/", "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": get_setup_env_variables(), }, + entrypoint_num_cpus=1, ) print(f"Submitted job with ID: {submission_id}") done = False diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 3eb241536..c8346909a 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -12,17 +12,55 @@ def get_ray_image(): return os.getenv("RAY_IMAGE", default_ray_image) +def get_setup_env_variables(**kwargs): + env_vars = dict() + + # Use input parameters provided for this function as environment variables + for key, value in kwargs.items(): + env_vars[str(key)] = value + + # Use specified pip index url instead of default(https://pypi.org/simple) if related environment variables exists + if ( + "PIP_INDEX_URL" in os.environ + and os.environ.get("PIP_INDEX_URL") != None + and os.environ.get("PIP_INDEX_URL") != "" + ): + env_vars["PIP_INDEX_URL"] = os.environ.get("PIP_INDEX_URL") + env_vars["PIP_TRUSTED_HOST"] = os.environ.get("PIP_TRUSTED_HOST") + else: + env_vars["PIP_INDEX_URL"] = "https://pypi.org/simple/" + env_vars["PIP_TRUSTED_HOST"] = "pypi.org" + + # Use specified storage bucket reference from which to download datasets + if ( + "AWS_DEFAULT_ENDPOINT" in os.environ + and os.environ.get("AWS_DEFAULT_ENDPOINT") != None + and os.environ.get("AWS_DEFAULT_ENDPOINT") != "" + ): + env_vars["AWS_DEFAULT_ENDPOINT"] = os.environ.get("AWS_DEFAULT_ENDPOINT") + env_vars["AWS_ACCESS_KEY_ID"] = os.environ.get("AWS_ACCESS_KEY_ID") + env_vars["AWS_SECRET_ACCESS_KEY"] = os.environ.get("AWS_SECRET_ACCESS_KEY") + env_vars["AWS_STORAGE_BUCKET"] = os.environ.get("AWS_STORAGE_BUCKET") + env_vars["AWS_STORAGE_BUCKET_MNIST_DIR"] = os.environ.get( + "AWS_STORAGE_BUCKET_MNIST_DIR" + ) + return env_vars + + def random_choice(): alphabet = string.ascii_lowercase + string.digits return "".join(random.choices(alphabet, k=5)) def create_namespace(self): - self.namespace = f"test-ns-{random_choice()}" - namespace_body = client.V1Namespace( - metadata=client.V1ObjectMeta(name=self.namespace) - ) - self.api_instance.create_namespace(namespace_body) + try: + self.namespace = f"test-ns-{random_choice()}" + namespace_body = client.V1Namespace( + metadata=client.V1ObjectMeta(name=self.namespace) + ) + self.api_instance.create_namespace(namespace_body) + except Exception as e: + return RuntimeError(e) def create_new_resource_flavor(self): @@ -60,7 +98,7 @@ def initialize_kubernetes_client(self): config.load_kube_config() # Initialize Kubernetes client self.api_instance = client.CoreV1Api() - self.custom_api = kubernetes.client.CustomObjectsApi(self.api_instance.api_client) + self.custom_api = client.CustomObjectsApi(self.api_instance.api_client) def run_oc_command(args): diff --git a/tests/upgrade/raycluster_sdk_upgrade_test.py b/tests/upgrade/raycluster_sdk_upgrade_test.py index e5fd18765..87f61f15e 100644 --- a/tests/upgrade/raycluster_sdk_upgrade_test.py +++ b/tests/upgrade/raycluster_sdk_upgrade_test.py @@ -121,6 +121,7 @@ def assert_jobsubmit_withlogin(self, cluster): runtime_env={ "working_dir": "./tests/e2e/", "pip": "./tests/e2e/mnist_pip_requirements.txt", + "env_vars": get_setup_env_variables(), }, ) print(f"Submitted job with ID: {submission_id}")