Skip to content

Commit

Permalink
[model server] Add a test to check multiple pods can read from the sa…
Browse files Browse the repository at this point in the history
…me pvc (#17)

* sAdd multi pod read-write pvc access

* sAdd multi pod read-write pvc access

* sAdd multi pod read-write pvc access

* addskip if nfs does not exist

* add onnx constant

* remove junit
  • Loading branch information
rnetser authored Oct 18, 2024
1 parent 6398ec7 commit cbad49e
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 56 deletions.
16 changes: 16 additions & 0 deletions tests/model_serving/model_server/storage/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
from typing import Any, Dict

# Models
ONNX_STR: str = "onnx"

KSERVE_CONTAINER_NAME: str = "kserve-container"
KSERVE_OVMS_SERVING_RUNTIME_PARAMS: Dict[str, Any] = {
"name": "ovms-runtime",
"model-name": ONNX_STR,
"template-name": "kserve-ovms",
"model-version": "1",
"multi-model": False,
}
INFERENCE_SERVICE_PARAMS: Dict[str, str] = {"name": ONNX_STR}

# Storage
NFS_STR: str = "nfs"
Empty file.
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import os
import shlex
from typing import Optional, Tuple
from typing import List, Optional, Tuple

import pytest
from kubernetes.dynamic import DynamicClient
from kubernetes.dynamic.exceptions import ResourceNotFoundError
from ocp_resources.deployment import Deployment
from ocp_resources.inference_service import InferenceService
from ocp_resources.namespace import Namespace
from ocp_resources.persistent_volume_claim import PersistentVolumeClaim
from ocp_resources.pod import Pod
from ocp_resources.resource import ResourceEditor
from ocp_resources.service_mesh_member import ServiceMeshMember
from ocp_resources.serving_runtime import ServingRuntime
from ocp_resources.storage_class import StorageClass
from ocp_utilities.infra import get_pods_by_name_prefix
from pytest_testconfig import config as py_config

from tests.model_serving.model_server.storage.constants import NFS_STR
from tests.model_serving.model_server.storage.pvc.utils import create_isvc
from utilities.serving_runtime import ServingRuntimeFromTemplate


Expand Down Expand Up @@ -57,14 +62,27 @@ def ci_s3_storage_uri(request) -> str:


@pytest.fixture(scope="class")
def model_pvc(admin_client: DynamicClient, model_namespace: Namespace) -> PersistentVolumeClaim:
with PersistentVolumeClaim(
name="model-pvc",
namespace=model_namespace.name,
client=admin_client,
size="15Gi",
accessmodes="ReadWriteOnce",
) as pvc:
def model_pvc(
request,
admin_client: DynamicClient,
model_namespace: Namespace,
) -> PersistentVolumeClaim:
access_mode = "ReadWriteOnce"
pvc_kwargs = {
"name": "model-pvc",
"namespace": model_namespace.name,
"client": admin_client,
"size": "15Gi",
}
if hasattr(request, "param"):
access_mode = request.param.get("access-modes")

if storage_class_name := request.param.get("storage-class-name"):
pvc_kwargs["storage_class"] = storage_class_name

pvc_kwargs["accessmodes"] = access_mode

with PersistentVolumeClaim(**pvc_kwargs) as pvc:
yield pvc


Expand Down Expand Up @@ -114,7 +132,7 @@ def downloaded_model_data(
def serving_runtime(
request,
admin_client: DynamicClient,
service_mesh_member,
service_mesh_member: ServiceMeshMember,
model_namespace: Namespace,
downloaded_model_data: str,
) -> ServingRuntime:
Expand All @@ -136,44 +154,67 @@ def inference_service(
model_pvc: PersistentVolumeClaim,
downloaded_model_data: str,
) -> InferenceService:
with InferenceService(
client=admin_client,
name=request.param["name"],
namespace=model_namespace.name,
annotations={
"serving.knative.openshift.io/enablePassthrough": "true",
"sidecar.istio.io/inject": "true",
"sidecar.istio.io/rewriteAppHTTPProbers": "true",
"serving.kserve.io/deploymentMode": "Serverless",
},
predictor={
"model": {
"modelFormat": {"name": serving_runtime.instance.spec.supportedModelFormats[0].name},
"version": "1",
"runtime": serving_runtime.name,
"storageUri": f"pvc://{model_pvc.name}/{downloaded_model_data}",
},
},
) as inference_service:
inference_service.wait_for_condition(
condition=inference_service.Condition.READY,
status=inference_service.Condition.Status.TRUE,
timeout=10 * 60,
isvc_kwargs = {
"client": admin_client,
"name": request.param["name"],
"namespace": model_namespace.name,
"runtime": serving_runtime.name,
"storage_uri": f"pvc://{model_pvc.name}/{downloaded_model_data}",
"model_format": serving_runtime.instance.spec.supportedModelFormats[0].name,
"deployment_mode": request.param.get("deployment-mode", "Serverless"),
}

if min_replicas := request.param.get("min-replicas"):
isvc_kwargs["min_replicas"] = min_replicas

with create_isvc(**isvc_kwargs) as isvc:
yield isvc


@pytest.fixture(scope="class")
def isvc_deployment_ready(admin_client: DynamicClient, inference_service: InferenceService) -> None:
deployment_name_prefix = f"{inference_service.name}-predictor"
deployment = list(
Deployment.get(
dyn_client=admin_client,
namespace=inference_service.namespace,
)
yield inference_service
)

if deployment and deployment[0].name.startswith(deployment_name_prefix):
deployment[0].wait_for_replicas()
return

raise ResourceNotFoundError(f"Deployment with prefix {deployment_name_prefix} not found")


@pytest.fixture()
def predictor_pod(admin_client: DynamicClient, inference_service: InferenceService) -> Pod:
def predictor_pods_scope_function(admin_client: DynamicClient, inference_service: InferenceService) -> List[Pod]:
return get_pods_by_name_prefix(
client=admin_client,
pod_prefix=f"{inference_service.name}-predictor",
namespace=inference_service.namespace,
)


@pytest.fixture(scope="class")
def predictor_pods_scope_class(
admin_client: DynamicClient, inference_service: InferenceService, isvc_deployment_ready: None
) -> List[Pod]:
return get_pods_by_name_prefix(
client=admin_client,
pod_prefix=f"{inference_service.name}-predictor",
namespace=inference_service.namespace,
)[0]
)


@pytest.fixture()
def patched_isvc(request, inference_service: InferenceService, predictor_pod: Pod) -> InferenceService:
def first_predictor_pod(predictor_pods_scope_function) -> Pod:
return predictor_pods_scope_function[0]


@pytest.fixture()
def patched_isvc(request, inference_service: InferenceService, first_predictor_pod: Pod) -> InferenceService:
with ResourceEditor(
patches={
inference_service: {
Expand All @@ -183,5 +224,11 @@ def patched_isvc(request, inference_service: InferenceService, predictor_pod: Po
}
}
):
predictor_pod.wait_deleted()
first_predictor_pod.wait_deleted()
yield inference_service


@pytest.fixture(scope="module")
def skip_if_no_nfs_storage_class(admin_client):
if not StorageClass(client=admin_client, name=NFS_STR).exists:
pytest.skip(f"StorageClass {NFS_STR} is missing from the cluster")
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import shlex
from typing import List

import pytest

from tests.model_serving.model_server.storage.constants import (
INFERENCE_SERVICE_PARAMS,
KSERVE_CONTAINER_NAME,
KSERVE_OVMS_SERVING_RUNTIME_PARAMS,
NFS_STR,
)

POD_LS_SPLIT_COMMAND: List[str] = shlex.split("ls /mnt/models")


pytestmark = pytest.mark.usefixtures("skip_if_no_nfs_storage_class")


@pytest.mark.parametrize(
"model_namespace, ci_s3_storage_uri, model_pvc, serving_runtime, inference_service",
[
pytest.param(
{"name": "pvc-rxw-access"},
{"model-dir": "test-dir"},
{"access-modes": "ReadWriteMany", "storage-class-name": NFS_STR},
KSERVE_OVMS_SERVING_RUNTIME_PARAMS,
INFERENCE_SERVICE_PARAMS | {"deployment-mode": "Serverless", "min-replicas": 2},
)
],
indirect=True,
)
class TestKservePVCReadWriteManyAccess:
def test_first_isvc_pvc_read_access(self, predictor_pods_scope_class):
predictor_pods_scope_class[0].execute(
container=KSERVE_CONTAINER_NAME,
command=POD_LS_SPLIT_COMMAND,
)

def test_second_isvc_pvc_read_access(self, predictor_pods_scope_class):
predictor_pods_scope_class[1].execute(
container=KSERVE_CONTAINER_NAME,
command=POD_LS_SPLIT_COMMAND,
)
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import shlex
from typing import List

from ocp_resources.pod import ExecOnPodError
import pytest
from ocp_utilities.infra import get_pods_by_name_prefix
from typing import List

from tests.model_serving.model_server.storage.constants import KSERVE_CONTAINER_NAME
from tests.model_serving.model_server.storage.constants import (
INFERENCE_SERVICE_PARAMS,
KSERVE_CONTAINER_NAME,
KSERVE_OVMS_SERVING_RUNTIME_PARAMS,
)

pytestmark = pytest.mark.usefixtures("valid_aws_config")


POD_SPLIT_COMMAND: List[str] = shlex.split("touch /mnt/models/test")
POD_TOUCH_SPLIT_COMMAND: List[str] = shlex.split("touch /mnt/models/test")


@pytest.mark.parametrize(
Expand All @@ -18,23 +23,17 @@
pytest.param(
{"name": "pvc-write-access"},
{"model-dir": "test-dir"},
{
"name": "ovms-runtime",
"model-name": "onnx",
"template-name": "kserve-ovms",
"model-version": "1",
"multi-model": False,
},
{"name": "onnx"},
KSERVE_OVMS_SERVING_RUNTIME_PARAMS,
INFERENCE_SERVICE_PARAMS,
)
],
indirect=True,
)
class TestKservePVCWriteAccess:
def test_pod_containers_not_restarted(self, predictor_pod):
def test_pod_containers_not_restarted(self, first_predictor_pod):
restarted_containers = [
container.name
for container in predictor_pod.instance.status.containerStatuses
for container in first_predictor_pod.instance.status.containerStatuses
if container.restartCount > 0
]
assert not restarted_containers, f"Containers {restarted_containers} restarted"
Expand All @@ -44,11 +43,11 @@ def test_isvc_read_only_annotation_not_set_by_default(self, inference_service):
"storage.kserve.io/readonly"
), "Read only annotation is set"

def test_isvc_read_only_annotation_default_value(self, predictor_pod):
def test_isvc_read_only_annotation_default_value(self, first_predictor_pod):
with pytest.raises(ExecOnPodError):
predictor_pod.execute(
first_predictor_pod.execute(
container=KSERVE_CONTAINER_NAME,
command=POD_SPLIT_COMMAND,
command=POD_TOUCH_SPLIT_COMMAND,
)

@pytest.mark.parametrize(
Expand All @@ -68,7 +67,7 @@ def test_isvc_read_only_annotation_false(self, admin_client, patched_isvc):
)[0]
new_pod.execute(
container=KSERVE_CONTAINER_NAME,
command=POD_SPLIT_COMMAND,
command=POD_TOUCH_SPLIT_COMMAND,
)

@pytest.mark.parametrize(
Expand All @@ -89,5 +88,5 @@ def test_isvc_read_only_annotation_true(self, admin_client, patched_isvc):
with pytest.raises(ExecOnPodError):
new_pod.execute(
container=KSERVE_CONTAINER_NAME,
command=POD_SPLIT_COMMAND,
command=POD_TOUCH_SPLIT_COMMAND,
)
46 changes: 46 additions & 0 deletions tests/model_serving/model_server/storage/pvc/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from contextlib import contextmanager

from kubernetes.dynamic import DynamicClient
from ocp_resources.inference_service import InferenceService


@contextmanager
def create_isvc(
client: DynamicClient,
name: str,
namespace: str,
deployment_mode: str,
storage_uri: str,
model_format: str,
runtime: str,
min_replicas: int = 1,
wait: bool = True,
) -> InferenceService:
with InferenceService(
client=client,
name=name,
namespace=namespace,
annotations={
"serving.knative.openshift.io/enablePassthrough": "true",
"sidecar.istio.io/inject": "true",
"sidecar.istio.io/rewriteAppHTTPProbers": "true",
"serving.kserve.io/deploymentMode": deployment_mode,
},
predictor={
"minReplicas": min_replicas,
"model": {
"modelFormat": {"name": model_format},
"version": "1",
"runtime": runtime,
"storageUri": storage_uri,
},
},
) as inference_service:
if wait:
inference_service.wait_for_condition(
condition=inference_service.Condition.READY,
status=inference_service.Condition.Status.TRUE,
timeout=10 * 60,
)

yield inference_service

0 comments on commit cbad49e

Please sign in to comment.