diff --git a/api/v1alpha/conduit_types.go b/api/v1alpha/conduit_types.go index e2daeb8..99e5825 100644 --- a/api/v1alpha/conduit_types.go +++ b/api/v1alpha/conduit_types.go @@ -54,8 +54,8 @@ const ( ) var ( - ConduitPipelineFile = path.Join(ConduitPipelinePath, "pipeline.yaml") - ConduitWithProcessorsVersion = "0.9.0" + ConduitPipelineFile = path.Join(ConduitPipelinePath, "pipeline.yaml") + ConduitEarliestAvailable = "v0.11.1" ) // ConduitSpec defines the desired state of Conduit diff --git a/api/v1alpha/conduit_webhook.go b/api/v1alpha/conduit_webhook.go index a8b0ac4..afe762b 100644 --- a/api/v1alpha/conduit_webhook.go +++ b/api/v1alpha/conduit_webhook.go @@ -5,6 +5,7 @@ import ( "path/filepath" "strings" + "github.com/Masterminds/semver/v3" "github.com/hashicorp/go-multierror" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -13,6 +14,18 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook/admission" ) +var conduitVerConstraint *semver.Constraints + +func init() { + var err error + // validate constraint + sanitized, _ := strings.CutPrefix(ConduitEarliestAvailable, "v") + conduitVerConstraint, err = semver.NewConstraint(fmt.Sprint(">= ", sanitized)) + if err != nil { + panic(fmt.Errorf("failed to create version constraint: %w", err)) + } +} + func (r *Conduit) SetupWebhookWithManager(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). For(r). @@ -103,6 +116,13 @@ var _ webhook.Validator = &Conduit{} func (r *Conduit) ValidateCreate() (admission.Warnings, error) { var errs error + if ok := validateConduitVersion(r.Spec.Version); !ok { + errs = multierror.Append(errs, fmt.Errorf("unsupported conduit version %s, minimum required %s", + r.Spec.Version, + ConduitEarliestAvailable, + )) + } + if _, err := validateConnectors(r.Spec.Connectors); err != nil { errs = multierror.Append(errs, err) } @@ -165,3 +185,12 @@ func validateProcessors(pp []*ConduitProcessor) (admission.Warnings, error) { return nil, errs } + +func validateConduitVersion(ver string) bool { + sanitized, _ := strings.CutPrefix(ver, "v") + v, err := semver.NewVersion(sanitized) + if err != nil { + return false + } + return conduitVerConstraint.Check(v) +} diff --git a/api/v1alpha/conduit_webhook_test.go b/api/v1alpha/conduit_webhook_test.go new file mode 100644 index 0000000..b30f9b5 --- /dev/null +++ b/api/v1alpha/conduit_webhook_test.go @@ -0,0 +1,34 @@ +package v1alpha + +import ( + "testing" + + "github.com/matryer/is" +) + +func TestWebhookValidate_ConduitVersion(t *testing.T) { + tests := []struct { + ver string + supported bool + }{ + {ver: "v0.11.1", supported: true}, + {ver: "v0.12.0", supported: true}, + {ver: "v1", supported: true}, + {ver: "v0.10", supported: false}, + {ver: "v0.9.1", supported: false}, + } + + testname := func(b bool, ver string) string { + if b { + return "supported " + ver + } + return "unsupported " + ver + } + + for _, tc := range tests { + t.Run(testname(tc.supported, tc.ver), func(t *testing.T) { + is := is.New(t) + is.Equal(validateConduitVersion(tc.ver), tc.supported) + }) + } +} diff --git a/config/samples/invalid-conduit-version.yaml b/config/samples/invalid-conduit-version.yaml new file mode 100644 index 0000000..b0e1e07 --- /dev/null +++ b/config/samples/invalid-conduit-version.yaml @@ -0,0 +1,29 @@ +apiVersion: operator.conduit.io/v1alpha +kind: Conduit +metadata: + name: conduit-generator +spec: + running: true + name: generator.log + description: generator pipeline + version: v0.10 + connectors: + - name: source-connector + type: source + plugin: builtin:generator + settings: + - name: format.type + value: structured + - name: format.options.id + value: "int" + - name: format.options.name + value: "string" + - name: format.options.company + value: "string" + - name: format.options.trial + value: "bool" + - name: recordCount + value: "3" + - name: destination-connector + type: destination + plugin: builtin:log diff --git a/controllers/conduit_containers.go b/controllers/conduit_containers.go index 69b97d1..82bda10 100644 --- a/controllers/conduit_containers.go +++ b/controllers/conduit_containers.go @@ -139,10 +139,7 @@ func ConduitRuntimeContainer(image, version string, envVars []corev1.EnvVar) cor "-db.type", "sqlite", "-db.sqlite.path", v1alpha.ConduitDBPath, "-pipelines.exit-on-error", - } - - if withProcessors(version) { - args = append(args, "-processors.path", v1alpha.ConduitProcessorsPath) + "-processors.path", v1alpha.ConduitProcessorsPath, } return corev1.Container{ diff --git a/controllers/conduit_containers_test.go b/controllers/conduit_containers_test.go index 4e31b35..b4042f7 100644 --- a/controllers/conduit_containers_test.go +++ b/controllers/conduit_containers_test.go @@ -193,87 +193,7 @@ func Test_ConduitInitContainers(t *testing.T) { func Test_ConduitRuntimeContainer(t *testing.T) { want := corev1.Container{ Name: "conduit-server", - Image: "my-image:v0.8.0", - ImagePullPolicy: corev1.PullAlways, - Args: []string{ - "/app/conduit", - "-pipelines.path", "/conduit.pipelines/pipeline.yaml", - "-connectors.path", "/conduit.storage/connectors", - "-db.type", "sqlite", - "-db.sqlite.path", "/conduit.storage/db", - "-pipelines.exit-on-error", - }, - Ports: []corev1.ContainerPort{ - { - Name: "http", - ContainerPort: 8080, - Protocol: corev1.ProtocolTCP, - }, - { - Name: "grpc", - ContainerPort: 8084, - Protocol: corev1.ProtocolTCP, - }, - }, - ReadinessProbe: &corev1.Probe{ - ProbeHandler: corev1.ProbeHandler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/healthz", - Scheme: "HTTP", - Port: intstr.FromString("http"), - }, - }, - TimeoutSeconds: 1, - PeriodSeconds: 10, - SuccessThreshold: 1, - FailureThreshold: 3, - }, - VolumeMounts: []corev1.VolumeMount{ - { - Name: "conduit-storage", - MountPath: "/conduit.storage", - }, - { - Name: "conduit-pipelines", - MountPath: "/conduit.pipelines", - ReadOnly: true, - }, - }, - Env: []corev1.EnvVar{ - { - Name: "var-1", - Value: "val-1", - }, - { - Name: "var-2", - Value: "val-2", - }, - }, - } - - got := ctrls.ConduitRuntimeContainer( - "my-image", - "v0.8.0", - []corev1.EnvVar{ - { - Name: "var-1", - Value: "val-1", - }, - { - Name: "var-2", - Value: "val-2", - }, - }, - ) - if diff := cmp.Diff(want, got); diff != "" { - t.Fatalf("container mismatch (-want +got): %v", diff) - } -} - -func Test_ConduitRuntimeContainerProcessors(t *testing.T) { - want := corev1.Container{ - Name: "conduit-server", - Image: "my-image:v0.9.1", + Image: "my-image:v0.11.1", ImagePullPolicy: corev1.PullAlways, Args: []string{ "/app/conduit", @@ -335,7 +255,7 @@ func Test_ConduitRuntimeContainerProcessors(t *testing.T) { got := ctrls.ConduitRuntimeContainer( "my-image", - "v0.9.1", + "v0.11.1", []corev1.EnvVar{ { Name: "var-1", diff --git a/controllers/conduit_ver.go b/controllers/conduit_ver.go deleted file mode 100644 index 62425de..0000000 --- a/controllers/conduit_ver.go +++ /dev/null @@ -1,29 +0,0 @@ -package controllers - -import ( - "fmt" - "strings" - - "github.com/Masterminds/semver/v3" - v1alpha "github.com/conduitio/conduit-operator/api/v1alpha" -) - -func init() { - // validate constraint - constraint := fmt.Sprint(">= ", v1alpha.ConduitWithProcessorsVersion) - if _, err := semver.NewConstraint(constraint); err != nil { - panic(fmt.Errorf("failed to create version constraint: %w", err)) - } -} - -// WithProcessors returns true when Conduit supports the new processors sdk. -// Returns false when Conduit does not offer support or when the version cannot be parsed. -func withProcessors(ver string) bool { - sanitized, _ := strings.CutPrefix(ver, "v") - v, err := semver.NewVersion(sanitized) - if err != nil { - return false - } - c, _ := semver.NewConstraint(fmt.Sprint(">= ", v1alpha.ConduitWithProcessorsVersion)) - return c.Check(v) -}