diff --git a/api/v1/conduit_types.go b/api/v1/conduit_types.go index 7eafd5b..5d8eed6 100644 --- a/api/v1/conduit_types.go +++ b/api/v1/conduit_types.go @@ -38,47 +38,55 @@ var conduitConditions = NewConditionSet( ) const ( - ConduitVersion = "0.8.0" + ConduitVersion = "v0.9.0" ConduitImage = "ghcr.io/conduitio/conduit" ConduitContainerName = "conduit-server" ConduitPipelinePath = "/conduit.pipelines" ConduitVolumePath = "/conduit.storage" ConduitDBPath = "/conduit.storage/db" ConduitConnectorsPath = "/conduit.storage/connectors" + ConduitProcessorsPath = "/conduit.storage/processors" ConduitStorageVolumeMount = "conduit-storage" ConduitPipelineVolumeMount = "conduit-pipelines" ConduitInitImage = "golang:1.22" - ConduitInitContainerName = "conduit-connector-init" + ConduitInitContainerName = "conduit-init" ) -var ConduitPipelineFile = path.Join(ConduitPipelinePath, "pipeline.yaml") +var ( + ConduitPipelineFile = path.Join(ConduitPipelinePath, "pipeline.yaml") + ConduitWithProcessorsVersion = "0.9.0" +) // ConduitSpec defines the desired state of Conduit type ConduitSpec struct { - Running bool `json:"running,omitempty"` - Name string `json:"name,omitempty"` - Version string `json:"version,omitempty"` - Description string `json:"description,omitempty"` - Connectors []*ConduitConnector `json:"connectors,omitempty"` - Processors []*ConduitProcessor `json:"processors,omitempty"` + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Image string `json:"image,omitempty"` + Running bool `json:"running,omitempty"` + Version string `json:"version,omitempty"` + + Connectors []*ConduitConnector `json:"connectors,omitempty"` + Processors []*ConduitProcessor `json:"processors,omitempty"` } type ConduitConnector struct { - Name string `json:"name,omitempty"` - Plugin string `json:"plugin,omitempty"` - PluginPkg string `json:"pluginPkg,omitempty"` - PluginName string `json:"pluginName,omitempty"` - PluginVersion string `json:"pluginVersion,omitempty"` - Type string `json:"type,omitempty"` - Settings []SettingsVar `json:"settings,omitempty"` - Processors []*ConduitProcessor `json:"processors,omitempty"` + Name string `json:"name,omitempty"` + Type string `json:"type,omitempty"` + Plugin string `json:"plugin,omitempty"` + PluginName string `json:"pluginName,omitempty"` + PluginPkg string `json:"pluginPkg,omitempty"` + PluginVersion string `json:"pluginVersion,omitempty"` + + Settings []SettingsVar `json:"settings,omitempty"` + Processors []*ConduitProcessor `json:"processors,omitempty"` } type ConduitProcessor struct { - Name string `json:"name,omitempty"` - Type string `json:"type,omitempty"` + Name string `json:"name,omitempty"` + Type string `json:"type,omitempty"` + Workers int `json:"workers,omitempty"` + Settings []SettingsVar `json:"settings,omitempty"` - Workers int `json:"workers,omitempty"` } type GlobalConfigMapRef struct { @@ -132,15 +140,11 @@ type Conduit struct { func (r *Conduit) NamespacedName() types.NamespacedName { return types.NamespacedName{ - Name: fmt.Sprintf("conduit-server-%s", r.Name), + Name: fmt.Sprint("conduit-server-", r.Name), Namespace: r.Namespace, } } -func (r *Conduit) ImageName() string { - return fmt.Sprintf("%s:v%s", ConduitImage, r.Spec.Version) -} - //+kubebuilder:object:root=true // ConduitList contains a list of Conduit diff --git a/api/v1/conduit_webhook.go b/api/v1/conduit_webhook.go index 552e6f7..ac2ee16 100644 --- a/api/v1/conduit_webhook.go +++ b/api/v1/conduit_webhook.go @@ -32,6 +32,10 @@ func (r *Conduit) Default() { } } + if r.Spec.Image == "" { + r.Spec.Image = ConduitImage + } + if r.Spec.Version == "" { r.Spec.Version = ConduitVersion } diff --git a/charts/conduit-operator/templates/crd.yaml b/charts/conduit-operator/templates/crd.yaml index a748b99..fc9ba60 100644 --- a/charts/conduit-operator/templates/crd.yaml +++ b/charts/conduit-operator/templates/crd.yaml @@ -159,6 +159,8 @@ spec: type: array description: type: string + image: + type: string name: type: string processors: diff --git a/config/crd/bases/operator.conduit.io_conduits.yaml b/config/crd/bases/operator.conduit.io_conduits.yaml index 1a0eeb9..b0b51e6 100644 --- a/config/crd/bases/operator.conduit.io_conduits.yaml +++ b/config/crd/bases/operator.conduit.io_conduits.yaml @@ -148,6 +148,8 @@ spec: type: array description: type: string + image: + type: string name: type: string processors: diff --git a/config/samples/conduit-generator-image-ver.yaml b/config/samples/conduit-generator-image-ver.yaml new file mode 100644 index 0000000..ea683ef --- /dev/null +++ b/config/samples/conduit-generator-image-ver.yaml @@ -0,0 +1,24 @@ +apiVersion: operator.conduit.io/v1 +kind: Conduit +metadata: + name: conduit-generator +spec: + running: true + name: generator.log + description: generator pipeline + image: ghcr.io/conduitio/conduit + version: v0.9.0 + connectors: + - name: source-connector + type: source + plugin: builtin:generator + settings: + - name: format.type + value: structured + - name: format.options + value: "id:int,name:string,company:string,trial:bool" + - name: recordCount + value: "3" + - name: destination-connector + type: destination + plugin: builtin:log diff --git a/config/samples/conduit-generator-secrets.yaml b/config/samples/conduit-generator-secrets.yaml index cd3aae9..0f03eea 100644 --- a/config/samples/conduit-generator-secrets.yaml +++ b/config/samples/conduit-generator-secrets.yaml @@ -14,7 +14,7 @@ metadata: spec: running: true name: generator.standalone.log - description: generator pipline + description: generator pipeline connectors: - name: source-connector type: source diff --git a/config/samples/conduit-generator.yaml b/config/samples/conduit-generator.yaml index bcbe759..312e3c3 100644 --- a/config/samples/conduit-generator.yaml +++ b/config/samples/conduit-generator.yaml @@ -5,7 +5,7 @@ metadata: spec: running: true name: generator.log - description: generator pipline + description: generator pipeline connectors: - name: source-connector type: source diff --git a/config/samples/conduit-with-proccessors.yaml b/config/samples/conduit-with-proccessors.yaml index a31fb7b..5cd25d9 100644 --- a/config/samples/conduit-with-proccessors.yaml +++ b/config/samples/conduit-with-proccessors.yaml @@ -5,7 +5,7 @@ metadata: spec: running: true name: generator.proc.log - description: redacting generator pipline + description: redacting generator pipeline connectors: - name: source-connector type: source diff --git a/config/samples/invalid-plugin.yaml b/config/samples/invalid-plugin.yaml index d510a79..48c1998 100644 --- a/config/samples/invalid-plugin.yaml +++ b/config/samples/invalid-plugin.yaml @@ -5,7 +5,7 @@ metadata: spec: running: true name: file-pipeline - description: test pipline + description: test pipeline connectors: - name: source-connector type: source diff --git a/controllers/conduit_containers.go b/controllers/conduit_containers.go index 7ff078e..bb302d3 100644 --- a/controllers/conduit_containers.go +++ b/controllers/conduit_containers.go @@ -77,6 +77,24 @@ func (c *commandBuilder) addConnectorBuild(b connectorBuild) { func ConduitInitContainers(cc []*v1.ConduitConnector) []corev1.Container { builder := &commandBuilder{} + containers := []corev1.Container{ + { + Name: v1.ConduitInitContainerName, + Image: v1.ConduitInitImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", "-c", + fmt.Sprintf("mkdir -p %s %s", v1.ConduitProcessorsPath, v1.ConduitConnectorsPath), + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: v1.ConduitStorageVolumeMount, + MountPath: v1.ConduitVolumePath, + }, + }, + }, + } + for _, c := range cc { if strings.HasPrefix(c.Plugin, "builtin") { continue @@ -90,13 +108,9 @@ func ConduitInitContainers(cc []*v1.ConduitConnector) []corev1.Container { }) } - if builder.empty() { - return []corev1.Container{} - } - - return []corev1.Container{ - { - Name: v1.ConduitInitContainerName, + if !builder.empty() { + containers = append(containers, corev1.Container{ + Name: fmt.Sprint(v1.ConduitInitContainerName, "-connectors"), Image: v1.ConduitInitImage, ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{ @@ -109,25 +123,33 @@ func ConduitInitContainers(cc []*v1.ConduitConnector) []corev1.Container { MountPath: v1.ConduitVolumePath, }, }, - }, + }) } + + return containers } // ConduitRuntimeContainer returns a Kubernetes container definition // todo is the pipelineName supposed to be used? -func ConduitRuntimeContainer(image string, envVars []corev1.EnvVar) corev1.Container { +func ConduitRuntimeContainer(image, version string, envVars []corev1.EnvVar) corev1.Container { + args := []string{ + "/app/conduit", + "-pipelines.path", v1.ConduitPipelineFile, + "-connectors.path", v1.ConduitConnectorsPath, + "-db.type", "badger", + "-db.badger.path", v1.ConduitDBPath, + "-pipelines.exit-on-error", + } + + if withProcessors(version) { + args = append(args, "-processors.path", v1.ConduitProcessorsPath) + } + return corev1.Container{ Name: v1.ConduitContainerName, - Image: image, + Image: fmt.Sprint(image, ":", version), ImagePullPolicy: corev1.PullAlways, - Args: []string{ - "/app/conduit", - "-pipelines.path", v1.ConduitPipelineFile, - "-connectors.path", v1.ConduitConnectorsPath, - "-db.type", "badger", - "-db.badger.path", v1.ConduitDBPath, - "-pipelines.exit-on-error", - }, + Args: args, Ports: []corev1.ContainerPort{ { Name: "http", diff --git a/controllers/conduit_containers_test.go b/controllers/conduit_containers_test.go index f9b8196..762abdf 100644 --- a/controllers/conduit_containers_test.go +++ b/controllers/conduit_containers_test.go @@ -13,9 +13,21 @@ import ( ) func Test_ConduitInitContainers(t *testing.T) { + initContainer := corev1.Container{ + Name: "conduit-init", + Image: "golang:1.22", + ImagePullPolicy: corev1.PullIfNotPresent, + Args: []string{ + "sh", "-xe", "-c", + "mkdir -p /conduit.storage/processors /conduit.storage/connectors", + }, + VolumeMounts: []corev1.VolumeMount{{Name: "conduit-storage", MountPath: "/conduit.storage"}}, + } + tests := []struct { name string connectors []*v1.ConduitConnector + imageVer string want []corev1.Container }{ { @@ -26,7 +38,7 @@ func Test_ConduitInitContainers(t *testing.T) { PluginVersion: "latest", }, }, - want: []corev1.Container{}, + want: []corev1.Container{initContainer}, }, { name: "with latest standalone connector", @@ -42,8 +54,9 @@ func Test_ConduitInitContainers(t *testing.T) { }, }, want: []corev1.Container{ + initContainer, { - Name: "conduit-connector-init", + Name: "conduit-init-connectors", Image: "golang:1.22", ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{ @@ -73,8 +86,9 @@ func Test_ConduitInitContainers(t *testing.T) { }, }, want: []corev1.Container{ + initContainer, { - Name: "conduit-connector-init", + Name: "conduit-init-connectors", Image: "golang:1.22", ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{ @@ -109,8 +123,9 @@ func Test_ConduitInitContainers(t *testing.T) { }, }, want: []corev1.Container{ + initContainer, { - Name: "conduit-connector-init", + Name: "conduit-init-connectors", Image: "golang:1.22", ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{ @@ -145,8 +160,9 @@ func Test_ConduitInitContainers(t *testing.T) { }, }, want: []corev1.Container{ + initContainer, { - Name: "conduit-connector-init", + Name: "conduit-init-connectors", Image: "golang:1.22", ImagePullPolicy: corev1.PullIfNotPresent, Args: []string{ @@ -177,7 +193,87 @@ func Test_ConduitInitContainers(t *testing.T) { func Test_ConduitRuntimeContainer(t *testing.T) { want := corev1.Container{ Name: "conduit-server", - Image: "my-image", + Image: "my-image:v0.8.0", + ImagePullPolicy: corev1.PullAlways, + Args: []string{ + "/app/conduit", + "-pipelines.path", "/conduit.pipelines/pipeline.yaml", + "-connectors.path", "/conduit.storage/connectors", + "-db.type", "badger", + "-db.badger.path", "/conduit.storage/db", + "-pipelines.exit-on-error", + }, + Ports: []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 8080, + Protocol: corev1.ProtocolTCP, + }, + { + Name: "grpc", + ContainerPort: 8084, + Protocol: corev1.ProtocolTCP, + }, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/healthz", + Scheme: "HTTP", + Port: intstr.FromString("http"), + }, + }, + TimeoutSeconds: 1, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 3, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conduit-storage", + MountPath: "/conduit.storage", + }, + { + Name: "conduit-pipelines", + MountPath: "/conduit.pipelines", + ReadOnly: true, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "var-1", + Value: "val-1", + }, + { + Name: "var-2", + Value: "val-2", + }, + }, + } + + got := ctrls.ConduitRuntimeContainer( + "my-image", + "v0.8.0", + []corev1.EnvVar{ + { + Name: "var-1", + Value: "val-1", + }, + { + Name: "var-2", + Value: "val-2", + }, + }, + ) + if diff := cmp.Diff(want, got); diff != "" { + t.Fatalf("container mismatch (-want +got): %v", diff) + } +} + +func Test_ConduitRuntimeContainerProcessors(t *testing.T) { + want := corev1.Container{ + Name: "conduit-server", + Image: "my-image:v0.9.1", ImagePullPolicy: corev1.PullAlways, Args: []string{ "/app/conduit", @@ -186,6 +282,8 @@ func Test_ConduitRuntimeContainer(t *testing.T) { "-db.type", "badger", "-db.badger.path", "/conduit.storage/db", "-pipelines.exit-on-error", + "-processors.path", + "/conduit.storage/processors", }, Ports: []corev1.ContainerPort{ { @@ -237,6 +335,7 @@ func Test_ConduitRuntimeContainer(t *testing.T) { got := ctrls.ConduitRuntimeContainer( "my-image", + "v0.9.1", []corev1.EnvVar{ { Name: "var-1", diff --git a/controllers/conduit_controller.go b/controllers/conduit_controller.go index 18599f7..1309187 100644 --- a/controllers/conduit_controller.go +++ b/controllers/conduit_controller.go @@ -279,7 +279,7 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1. RestartPolicy: corev1.RestartPolicyAlways, InitContainers: ConduitInitContainers(c.Spec.Connectors), Containers: []corev1.Container{ - ConduitRuntimeContainer(c.ImageName(), EnvVars(c)), + ConduitRuntimeContainer(c.Spec.Image, c.Spec.Version, EnvVars(c)), }, Volumes: []corev1.Volume{ ConduitVolume(nn.Name), diff --git a/controllers/conduit_ver.go b/controllers/conduit_ver.go new file mode 100644 index 0000000..1dbf163 --- /dev/null +++ b/controllers/conduit_ver.go @@ -0,0 +1,29 @@ +package controllers + +import ( + "fmt" + "strings" + + "github.com/Masterminds/semver/v3" + v1 "github.com/conduitio-labs/conduit-operator/api/v1" +) + +func init() { + // validate constraint + constraint := fmt.Sprint(">= ", v1.ConduitWithProcessorsVersion) + if _, err := semver.NewConstraint(constraint); err != nil { + panic(fmt.Errorf("failed to create version constraint: %w", err)) + } +} + +// WithProcessors returns true when Conduit supports the new processors sdk. +// Returns false when Conduit does not offer support or when the version cannot be parsed. +func withProcessors(ver string) bool { + sanitized, _ := strings.CutPrefix(ver, "v") + v, err := semver.NewVersion(sanitized) + if err != nil { + return false + } + c, _ := semver.NewConstraint(fmt.Sprint(">= ", v1.ConduitWithProcessorsVersion)) + return c.Check(v) +} diff --git a/go.mod b/go.mod index a55ecc6..ca14475 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/conduitio-labs/conduit-operator go 1.22 require ( + github.com/Masterminds/semver/v3 v3.2.1 github.com/conduitio/conduit v0.8.1 github.com/go-logr/logr v1.4.1 github.com/golang/mock v1.6.0 @@ -20,7 +21,6 @@ require ( require ( github.com/Masterminds/goutils v1.1.1 // indirect - github.com/Masterminds/semver/v3 v3.2.1 // indirect github.com/Masterminds/sprig/v3 v3.2.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect diff --git a/go.sum b/go.sum index 9a541d8..2c9b02c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= +github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww= github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=