Skip to content

Commit 32f0173

Browse files
committed
fix(sdk/backend): allow using cluster's default sc
When using the SDK's function `CreatePVC`, leaving the `storage_class_name` field empty will result in the cluster’s default storage class being applied. To enhance modularity and clarity, the logic for building the PVC definition has been refactored into a dedicated function. Error messages have been updated to align with this change, and unit tests have been implemented to cover all required and optional fields. In the code handling annotations, the `GetFields` method has replaced the use of the `AsMap` method. This approach is more convenient and eliminates the need for type conversion to `structpb.Value`. Resolves: #11396 Signed-off-by: Sébastien Han <[email protected]>
1 parent 873e9de commit 32f0173

10 files changed

+381
-87
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
* **kubernetes_platform:** fix api-generator docker mount for SELinux ([\#10890](https://github.com/kubeflow/pipelines/issues/10890)) ([e69078b](https://github.com/kubeflow/pipelines/commit/e69078b2b65c0e34fd56499bbe34da882dc6e009))
6666
* **manifests:** Move metacontroller to the top in kustmization.yaml ([\#10669](https://github.com/kubeflow/pipelines/issues/10669)) ([4e9fe75](https://github.com/kubeflow/pipelines/commit/4e9fe75d4564bbcdde7cd358298361e94d4a20be))
6767
* **sdk:** Throw 'exit_task cannot depend on any other tasks.' error when an ExitHandler has a parameter dependent on other task ([\#11005](https://github.com/kubeflow/pipelines/issues/11005)) ([08185e7](https://github.com/kubeflow/pipelines/commit/08185e71717ef628be3cbe2cdeb1fd55b25581d4))
68-
68+
* **sdk/backend**: Use the cluster's default storage class when creating a PVC with an empty storage class ([\#11396](http://github.com/kubeflow/pipelines/issues/11396)) ([95e43499b](https://github.com/kubeflow/pipelines/commit/95e43499b2b630dda09f632c7bd64ff5d59b89b7))
6969

7070
### Other Pull Requests
7171

backend/src/v2/driver/driver.go

+79-73
Original file line numberDiff line numberDiff line change
@@ -1667,63 +1667,6 @@ func createPVC(
16671667
inputs := execution.ExecutorInput.Inputs
16681668
glog.Infof("Input parameter values: %+v", inputs.ParameterValues)
16691669

1670-
// Requied input: access_modes
1671-
accessModeInput, ok := inputs.ParameterValues["access_modes"]
1672-
if !ok || accessModeInput == nil {
1673-
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create pvc: parameter access_modes not provided")
1674-
}
1675-
var accessModes []k8score.PersistentVolumeAccessMode
1676-
for _, value := range accessModeInput.GetListValue().GetValues() {
1677-
accessModes = append(accessModes, accessModeMap[value.GetStringValue()])
1678-
}
1679-
1680-
// Optional input: pvc_name and pvc_name_suffix
1681-
// Can only provide at most one of these two parameters.
1682-
// If neither is provided, PVC name is a randomly generated UUID.
1683-
pvcNameSuffixInput := inputs.ParameterValues["pvc_name_suffix"]
1684-
pvcNameInput := inputs.ParameterValues["pvc_name"]
1685-
if pvcNameInput.GetStringValue() != "" && pvcNameSuffixInput.GetStringValue() != "" {
1686-
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create pvc: at most one of pvc_name and pvc_name_suffix can be non-empty")
1687-
} else if pvcNameSuffixInput.GetStringValue() != "" {
1688-
pvcName = uuid.NewString() + pvcNameSuffixInput.GetStringValue()
1689-
// Add pvcName to the executor input for fingerprint generation
1690-
execution.ExecutorInput.Inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName)
1691-
} else if pvcNameInput.GetStringValue() != "" {
1692-
pvcName = pvcNameInput.GetStringValue()
1693-
} else {
1694-
pvcName = uuid.NewString()
1695-
// Add pvcName to the executor input for fingerprint generation
1696-
execution.ExecutorInput.Inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName)
1697-
}
1698-
1699-
// Required input: size
1700-
volumeSizeInput, ok := inputs.ParameterValues["size"]
1701-
if !ok || volumeSizeInput == nil {
1702-
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to create pvc: parameter volumeSize not provided")
1703-
}
1704-
1705-
// Optional input: storage_class_name
1706-
// When not provided, use default value `standard`
1707-
storageClassNameInput, ok := inputs.ParameterValues["storage_class_name"]
1708-
var storageClassName string
1709-
if !ok {
1710-
storageClassName = "standard"
1711-
} else {
1712-
storageClassName = storageClassNameInput.GetStringValue()
1713-
}
1714-
1715-
// Optional input: annotations
1716-
pvcAnnotationsInput := inputs.ParameterValues["annotations"]
1717-
pvcAnnotations := make(map[string]string)
1718-
for key, val := range pvcAnnotationsInput.GetStructValue().AsMap() {
1719-
typedVal := val.(structpb.Value)
1720-
pvcAnnotations[key] = typedVal.GetStringValue()
1721-
}
1722-
1723-
// Optional input: volume_name
1724-
volumeNameInput := inputs.ParameterValues["volume_name"]
1725-
volumeName := volumeNameInput.GetStringValue()
1726-
17271670
// Get execution fingerprint and MLMD ID for caching
17281671
// If pvcName includes a randomly generated UUID, it is added in the execution input as a key-value pair for this purpose only
17291672
// The original execution is not changed.
@@ -1771,22 +1714,10 @@ func createPVC(
17711714
return pvcName, createdExecution, pb.Execution_CACHED, nil
17721715
}
17731716

1774-
// Create a PersistentVolumeClaim object
1775-
pvc := &k8score.PersistentVolumeClaim{
1776-
ObjectMeta: metav1.ObjectMeta{
1777-
Name: pvcName,
1778-
Annotations: pvcAnnotations,
1779-
},
1780-
Spec: k8score.PersistentVolumeClaimSpec{
1781-
AccessModes: accessModes,
1782-
Resources: k8score.VolumeResourceRequirements{
1783-
Requests: k8score.ResourceList{
1784-
k8score.ResourceStorage: k8sres.MustParse(volumeSizeInput.GetStringValue()),
1785-
},
1786-
},
1787-
StorageClassName: &storageClassName,
1788-
VolumeName: volumeName,
1789-
},
1717+
// Build the PVC object
1718+
pvc, err := buildPVC(inputs)
1719+
if err != nil {
1720+
return "", createdExecution, pb.Execution_FAILED, fmt.Errorf("failed to build pvc definition: %w", err)
17901721
}
17911722

17921723
// Create the PVC in the cluster
@@ -1811,6 +1742,81 @@ func createPVC(
18111742
return createdPVC.ObjectMeta.Name, createdExecution, pb.Execution_COMPLETE, nil
18121743
}
18131744

1745+
// buildPVC creates a PersistentVolumeClaim object based on the provided inputs.
1746+
func buildPVC(inputs *pipelinespec.ExecutorInput_Inputs) (*k8score.PersistentVolumeClaim, error) {
1747+
// Required input: access_modes
1748+
accessModeInput, ok := inputs.ParameterValues["access_modes"]
1749+
if !ok || accessModeInput == nil {
1750+
return nil, fmt.Errorf("parameter access_modes not provided")
1751+
}
1752+
var accessModes []k8score.PersistentVolumeAccessMode
1753+
for _, value := range accessModeInput.GetListValue().GetValues() {
1754+
accessModes = append(accessModes, accessModeMap[value.GetStringValue()])
1755+
}
1756+
1757+
// Optional input: pvc_name and pvc_name_suffix
1758+
// Can only provide at most one of these two parameters.
1759+
// If neither is provided, PVC name is a randomly generated UUID.
1760+
pvcNameSuffixInput := inputs.ParameterValues["pvc_name_suffix"]
1761+
pvcNameInput := inputs.ParameterValues["pvc_name"]
1762+
var pvcName string
1763+
if pvcNameInput.GetStringValue() != "" && pvcNameSuffixInput.GetStringValue() != "" {
1764+
return nil, fmt.Errorf("at most one of pvc_name and pvc_name_suffix can be non-empty")
1765+
} else if pvcNameSuffixInput.GetStringValue() != "" {
1766+
pvcName = uuid.NewString() + pvcNameSuffixInput.GetStringValue()
1767+
// Add pvcName to the executor input for fingerprint generation
1768+
inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName)
1769+
} else if pvcNameInput.GetStringValue() != "" {
1770+
pvcName = pvcNameInput.GetStringValue()
1771+
} else {
1772+
pvcName = uuid.NewString()
1773+
// Add pvcName to the executor input for fingerprint generation
1774+
inputs.ParameterValues[pvcName] = structpb.NewStringValue(pvcName)
1775+
}
1776+
1777+
// Required input: size
1778+
volumeSizeInput, ok := inputs.ParameterValues["size"]
1779+
if !ok || volumeSizeInput == nil {
1780+
return nil, fmt.Errorf("parameter size not provided")
1781+
}
1782+
1783+
// Optional input: annotations
1784+
pvcAnnotationsInput := inputs.ParameterValues["annotations"]
1785+
pvcAnnotations := make(map[string]string)
1786+
for key, val := range pvcAnnotationsInput.GetStructValue().GetFields() {
1787+
pvcAnnotations[key] = val.GetStringValue()
1788+
}
1789+
1790+
// Optional input: volume_name
1791+
volumeNameInput := inputs.ParameterValues["volume_name"]
1792+
volumeName := volumeNameInput.GetStringValue()
1793+
1794+
// Create a PersistentVolumeClaim object
1795+
pvc := &k8score.PersistentVolumeClaim{
1796+
ObjectMeta: metav1.ObjectMeta{
1797+
Name: pvcName,
1798+
Annotations: pvcAnnotations,
1799+
},
1800+
Spec: k8score.PersistentVolumeClaimSpec{
1801+
AccessModes: accessModes,
1802+
Resources: k8score.ResourceRequirements{
1803+
Requests: k8score.ResourceList{
1804+
k8score.ResourceStorage: k8sres.MustParse(volumeSizeInput.GetStringValue()),
1805+
},
1806+
},
1807+
VolumeName: volumeName,
1808+
},
1809+
}
1810+
1811+
// Optional input: storage_class_name
1812+
if storageClassNameInput, ok := inputs.ParameterValues["storage_class_name"]; ok {
1813+
storageClassName := storageClassNameInput.GetStringValue()
1814+
pvc.Spec.StorageClassName = &storageClassName
1815+
}
1816+
1817+
return pvc, nil
1818+
}
1819+
18141820
func deletePVC(
18151821
ctx context.Context,
18161822
k8sClient kubernetes.Interface,

backend/src/v2/driver/driver_test.go

+157
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"encoding/json"
1818
"testing"
1919

20+
"google.golang.org/protobuf/types/known/structpb"
2021
k8sres "k8s.io/apimachinery/pkg/api/resource"
2122
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2223

@@ -25,6 +26,7 @@ import (
2526
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
2627
"github.com/spf13/viper"
2728
"github.com/stretchr/testify/assert"
29+
"github.com/stretchr/testify/require"
2830
k8score "k8s.io/api/core/v1"
2931
)
3032

@@ -1621,3 +1623,158 @@ func Test_extendPodSpecPatch_GenericEphemeralVolume(t *testing.T) {
16211623
})
16221624
}
16231625
}
1626+
1627+
func Test_buildPVC(t *testing.T) {
1628+
type args struct {
1629+
inputs *pipelinespec.ExecutorInput_Inputs
1630+
}
1631+
accessModesList, _ := structpb.NewList([]interface{}{"ReadWriteOnce"})
1632+
accessModes := structpb.NewListValue(accessModesList)
1633+
tests := []struct {
1634+
name string
1635+
args args
1636+
wantErr bool
1637+
errMsg string
1638+
}{
1639+
{
1640+
"valid case",
1641+
args{
1642+
&pipelinespec.ExecutorInput_Inputs{
1643+
ParameterValues: map[string]*structpb.Value{
1644+
"access_modes": accessModes,
1645+
"pvc_name": structpb.NewStringValue("my-pvc"),
1646+
"size": structpb.NewStringValue("5Gi"),
1647+
"storage_class_name": structpb.NewStringValue("standard"),
1648+
"volume_name": structpb.NewStringValue("volume-name"),
1649+
},
1650+
},
1651+
},
1652+
false,
1653+
"",
1654+
},
1655+
{
1656+
"missing required access_modes",
1657+
args{
1658+
&pipelinespec.ExecutorInput_Inputs{
1659+
ParameterValues: map[string]*structpb.Value{
1660+
"access_modes": nil,
1661+
"pvc_name": structpb.NewStringValue("my-pvc-2"),
1662+
"size": structpb.NewStringValue("5Gi"),
1663+
"storage_class_name": structpb.NewStringValue("standard"),
1664+
"volume_name": structpb.NewStringValue("volume-name"),
1665+
},
1666+
},
1667+
},
1668+
true,
1669+
"parameter access_modes not provided",
1670+
},
1671+
{
1672+
"both pvc_name_suffix and pvc_name provided",
1673+
args{
1674+
&pipelinespec.ExecutorInput_Inputs{
1675+
ParameterValues: map[string]*structpb.Value{
1676+
"access_modes": accessModes,
1677+
"pvc_name": structpb.NewStringValue("my-pvc-3"),
1678+
"pvc_name_suffix": structpb.NewStringValue("-my-pvc"),
1679+
"size": structpb.NewStringValue("5Gi"),
1680+
"storage_class_name": structpb.NewStringValue("standard"),
1681+
"volume_name": structpb.NewStringValue("volume-name"),
1682+
},
1683+
},
1684+
},
1685+
true,
1686+
"at most one of pvc_name and pvc_name_suffix can be non-empty",
1687+
},
1688+
{
1689+
"missing required size",
1690+
args{
1691+
&pipelinespec.ExecutorInput_Inputs{
1692+
ParameterValues: map[string]*structpb.Value{
1693+
"access_modes": accessModes,
1694+
"pvc_name": structpb.NewStringValue("my-pvc"),
1695+
"storage_class_name": structpb.NewStringValue("standard"),
1696+
"volume_name": structpb.NewStringValue("volume-name"),
1697+
},
1698+
},
1699+
},
1700+
true,
1701+
"parameter size not provided",
1702+
},
1703+
{
1704+
"annotations provided",
1705+
args{
1706+
&pipelinespec.ExecutorInput_Inputs{
1707+
ParameterValues: map[string]*structpb.Value{
1708+
"access_modes": accessModes,
1709+
"pvc_name": structpb.NewStringValue("my-pvc"),
1710+
"size": structpb.NewStringValue("5Gi"),
1711+
"annotations": structpb.NewStructValue(
1712+
&structpb.Struct{
1713+
Fields: map[string]*structpb.Value{
1714+
"key1": structpb.NewStringValue("value1"),
1715+
"key2": structpb.NewStringValue("value2"),
1716+
},
1717+
},
1718+
),
1719+
"volume_name": structpb.NewStringValue("volume-name"),
1720+
},
1721+
},
1722+
},
1723+
false,
1724+
"",
1725+
},
1726+
{
1727+
"storage_class_name not provided",
1728+
args{
1729+
&pipelinespec.ExecutorInput_Inputs{
1730+
ParameterValues: map[string]*structpb.Value{
1731+
"access_modes": accessModes,
1732+
"pvc_name": structpb.NewStringValue("my-pvc"),
1733+
"size": structpb.NewStringValue("5Gi"),
1734+
"volume_name": structpb.NewStringValue("volume-name"),
1735+
},
1736+
},
1737+
},
1738+
false,
1739+
"",
1740+
},
1741+
}
1742+
for _, tt := range tests {
1743+
t.Run(tt.name, func(t *testing.T) {
1744+
pvc, err := buildPVC(tt.args.inputs)
1745+
if tt.wantErr {
1746+
require.NotNil(t, err)
1747+
require.Nil(t, pvc)
1748+
assert.Contains(t, err.Error(), tt.errMsg)
1749+
} else {
1750+
require.Nil(t, err)
1751+
require.NotNil(t, pvc)
1752+
// Test the PVC fields
1753+
if storageClassNameInput, ok := tt.args.inputs.ParameterValues["storage_class_name"]; ok {
1754+
assert.Equal(t, storageClassNameInput.GetStringValue(), *pvc.Spec.StorageClassName)
1755+
} else {
1756+
assert.Nil(t, pvc.Spec.StorageClassName)
1757+
}
1758+
if annotationsInput, ok := tt.args.inputs.ParameterValues["annotations"]; ok {
1759+
annotations := annotationsInput.GetStructValue().GetFields()
1760+
assert.Equal(t, len(annotations), len(pvc.Annotations))
1761+
for key, value := range annotations {
1762+
assert.Equal(t, value.GetStringValue(), pvc.Annotations[key])
1763+
}
1764+
}
1765+
if pvcNameInput, ok := tt.args.inputs.ParameterValues["pvc_name"]; ok {
1766+
assert.Equal(t, pvcNameInput.GetStringValue(), pvc.Name)
1767+
}
1768+
if pvcNameSuffixInput, ok := tt.args.inputs.ParameterValues["pvc_name_suffix"]; ok {
1769+
assert.Equal(t, pvc.Name, pvcNameSuffixInput.GetStringValue())
1770+
}
1771+
if sizeInput, ok := tt.args.inputs.ParameterValues["size"]; ok {
1772+
storage := pvc.Spec.Resources.Requests[k8score.ResourceStorage]
1773+
assert.Equal(t, sizeInput.GetStringValue(), storage.String())
1774+
}
1775+
assert.Equal(t, k8score.ReadWriteOnce, pvc.Spec.AccessModes[0])
1776+
assert.Equal(t, "volume-name", pvc.Spec.VolumeName)
1777+
}
1778+
})
1779+
}
1780+
}

kubernetes_platform/python/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def my_pipeline():
166166
pvc_name_suffix='-my-pvc',
167167
access_modes=['ReadWriteOnce'],
168168
size='5Gi',
169-
storage_class_name='standard',
169+
storage_class_name='standard', # optional - do not specify to use the default storage class
170170
)
171171

172172
task1 = make_data()

kubernetes_platform/python/kfp/kubernetes/volume.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def CreatePVC(
4949
of ``pvc_name`` and ``pvc_name_suffix`` can be provided.
5050
storage_class_name: Name of StorageClass from which to provision the PV
5151
to back the PVC. ``None`` indicates to use the cluster's default
52-
storage_class_name. Set to ``''`` for a statically specified PVC.
52+
storage class. Set to ``''`` for a statically specified PVC.
5353
volume_name: Pre-existing PersistentVolume that should back the
5454
provisioned PersistentVolumeClaim. Used for statically
5555
specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaimSpec>`_.

0 commit comments

Comments
 (0)