Skip to content

Commit

Permalink
Validate podtemplate for ClusterTopology
Browse files Browse the repository at this point in the history
Signed-off-by: SK Ali Arman <[email protected]>
  • Loading branch information
sheikh-arman committed May 29, 2024
1 parent 46aa4ce commit f01359e
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 25 deletions.
4 changes: 4 additions & 0 deletions apis/kubedb/v1alpha2/clickhouse_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func (c *ClickHouse) PodLabels(extraLabels ...map[string]string) map[string]stri
return c.offshootLabels(meta_util.OverwriteKeys(c.OffshootSelectors(), extraLabels...), c.Spec.PodTemplate.Labels)
}

func (c *ClickHouse) ClusterPodLabels(petSetName string, labels map[string]string, extraLabels ...map[string]string) map[string]string {
return c.offshootLabels(meta_util.OverwriteKeys(c.OffshootClusterSelectors(petSetName), extraLabels...), labels)
}

func (c *ClickHouse) GetConnectionScheme() string {
scheme := "http"
return scheme
Expand Down
70 changes: 45 additions & 25 deletions apis/kubedb/v1alpha2/clickhouse_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func (r *ClickHouse) ValidateCreateOrUpdate() error {
r.Name,
"PodTemplate should be nil in clusterTopology"))
}

if r.Spec.Storage != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("storage"),
r.Name,
"storage should be nil in clusterTopology"))
}

} else {
// number of replicas can not be 0 or less
if r.Spec.Replicas != nil && *r.Spec.Replicas <= 0 {
Expand Down Expand Up @@ -134,25 +141,38 @@ func (r *ClickHouse) ValidateCreateOrUpdate() error {
err.Error()))
}
}

err := r.validateVolumes(r)
if err != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("podTemplate").Child("spec").Child("volumes"),
r.Name,
err.Error()))
}

err = r.validateVolumesMountPaths(r.Spec.PodTemplate)
if err != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("podTemplate").Child("spec").Child("volumeMounts"),
r.Name,
err.Error()))
if r.Spec.ClusterTopology == nil {
err := r.validateVolumes(r.Spec.PodTemplate)
if err != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("podTemplate").Child("spec").Child("volumes"),
r.Name,
err.Error()))
}
err = r.validateVolumesMountPaths(r.Spec.PodTemplate)
if err != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("podTemplate").Child("spec").Child("volumeMounts"),
r.Name,
err.Error()))
}
}

if r.Spec.ClusterTopology != nil {
clusters := r.Spec.ClusterTopology.Cluster
for _, cluster := range clusters {
allErr = r.validateClusterStorageType(cluster.StorageType, r.Spec.Storage, cluster.Name, allErr)
allErr = r.validateClusterStorageType(cluster, allErr)

err := r.validateVolumes(cluster.PodTemplate)
if err != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child("podTemplate").Child("spec").Child("volumes"),
r.Name,
err.Error()))
}
err = r.validateVolumesMountPaths(cluster.PodTemplate)
if err != nil {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child("podTemplate").Child("spec").Child("volumeMounts"),
r.Name,
err.Error()))
}
}
} else {
allErr = r.validateStandaloneStorageType(r.Spec.StorageType, r.Spec.Storage, allErr)
Expand Down Expand Up @@ -192,20 +212,20 @@ func (c *ClickHouse) validateStandaloneStorageType(storageType StorageType, stor
return allErr
}

func (c *ClickHouse) validateClusterStorageType(storageType StorageType, storage *core.PersistentVolumeClaimSpec, cluster string, allErr field.ErrorList) field.ErrorList {
if storageType == "" {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child(cluster).Child("storageType"),
func (c *ClickHouse) validateClusterStorageType(cluster ClusterSpec, allErr field.ErrorList) field.ErrorList {
if cluster.StorageType == "" {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child(cluster.Name).Child("storageType"),
c.Name,
"StorageType can not be empty"))
} else {
if storageType != StorageTypeDurable && storageType != StorageTypeEphemeral {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child(cluster).Child("storageType"),
storageType,
if cluster.StorageType != StorageTypeDurable && cluster.StorageType != StorageTypeEphemeral {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child(cluster.Name).Child("storageType"),
cluster.StorageType,
"StorageType should be either durable or ephemeral"))
}
}
if storage == nil && storageType == StorageTypeDurable {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child(cluster).Child("storage"),
if cluster.Storage == nil && cluster.StorageType == StorageTypeDurable {
allErr = append(allErr, field.Invalid(field.NewPath("spec").Child("clusterTopology").Child(cluster.Name).Child("storage"),
c.Name,
"Storage can't be empty when StorageType is durable"))
}
Expand All @@ -226,13 +246,13 @@ var clickhouseReservedVolumes = []string{
ClickHouseVolumeData,
}

func (r *ClickHouse) validateVolumes(db *ClickHouse) error {
if db.Spec.PodTemplate.Spec.Volumes == nil {
func (r *ClickHouse) validateVolumes(podTemplate *ofst.PodTemplateSpec) error {
if podTemplate.Spec.Volumes == nil {
return nil
}
rsv := make([]string, len(clickhouseReservedVolumes))
copy(rsv, clickhouseReservedVolumes)
volumes := db.Spec.PodTemplate.Spec.Volumes
volumes := podTemplate.Spec.Volumes
for _, rv := range rsv {
for _, ugv := range volumes {
if ugv.Name == rv {
Expand Down

0 comments on commit f01359e

Please sign in to comment.