diff --git a/Makefile b/Makefile index 76c746b..a319125 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ CERT_MANAGER := https://github.com/cert-manager/cert-manager/releases/downl # KUSTOMIZE_VERSION ?= v4.5.7 KUSTOMIZE_VERSION ?= v5.4.3 CTRL_GEN_VERSION ?= v0.16.3 -KIND_VERSION ?= v0.22.0 +KIND_VERSION ?= v0.24.0 GOLINT_VERSION ?= v1.61.0 .EXPORT_ALL_VARIABLES: diff --git a/README.md b/README.md index 3e2fc0c..b2866ff 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ spec: 3. Deploy a sample pipeline configuration ```shell - kubectl apply -f samples/conf/conduit-generator.yaml + kubectl apply -f config/samples/conduit-generator.yaml ``` 4. Wait for instance to become ready diff --git a/config/samples/conduit-generator-image-ver.yaml b/config/samples/conduit-generator-image-ver.yaml index ea683ef..db6044e 100644 --- a/config/samples/conduit-generator-image-ver.yaml +++ b/config/samples/conduit-generator-image-ver.yaml @@ -1,24 +1,30 @@ -apiVersion: operator.conduit.io/v1 +apiVersion: operator.conduit.io/v1alpha kind: Conduit metadata: - name: conduit-generator + name: conduit-generator-image-ver spec: running: true name: generator.log description: generator pipeline image: ghcr.io/conduitio/conduit - version: v0.9.0 + version: v0.11.1 connectors: - name: source-connector type: source - plugin: builtin:generator + plugin: "builtin:generator" settings: - name: format.type value: structured - - name: format.options - value: "id:int,name:string,company:string,trial:bool" + - name: format.options.id + value: "int" + - name: format.options.name + value: "string" + - name: format.options.company + value: "string" + - name: format.options.trial + value: "bool" - name: recordCount value: "3" - name: destination-connector type: destination - plugin: builtin:log + plugin: "builtin:log" diff --git a/config/samples/conduit-generator-secrets.yaml b/config/samples/conduit-generator-secrets.yaml index 0f03eea..4efaa1d 100644 --- a/config/samples/conduit-generator-secrets.yaml +++ b/config/samples/conduit-generator-secrets.yaml @@ -1,13 +1,13 @@ apiVersion: v1 data: type: c3RydWN0dXJlZAo= - options: aWQ6aW50LG5hbWU6c3RyaW5nLGNvbXBhbnk6c3RyaW5nLHRyaWFsOmJvb2wK + password-type: c3RyaW5n kind: Secret metadata: name: conduit-generator-secrets-format type: Opaque --- -apiVersion: operator.conduit.io/v1 +apiVersion: operator.conduit.io/v1alpha kind: Conduit metadata: name: conduit-generator-secrets @@ -19,19 +19,27 @@ spec: - name: source-connector type: source plugin: conduitio/conduit-connector-generator - pluginVersion: v0.5.0 + pluginVersion: v0.7.0 settings: - name: format.type secretRef: key: type name: conduit-generator-secrets-format - - name: format.options + - name: format.options.id + value: "int" + - name: format.options.name + value: "string" + - name: format.options.company + value: "string" + - name: format.options.trial + value: "bool" + - name: format.options.password secretRef: - key: options + key: password-type name: conduit-generator-secrets-format - name: recordCount value: "3" - name: destination-connector type: destination plugin: conduitio/conduit-connector-log - pluginVersion: v0.3.0 + pluginVersion: v0.4.0 diff --git a/config/samples/conduit-generator.yaml b/config/samples/conduit-generator.yaml index 312e3c3..d59ba65 100644 --- a/config/samples/conduit-generator.yaml +++ b/config/samples/conduit-generator.yaml @@ -1,4 +1,4 @@ -apiVersion: operator.conduit.io/v1 +apiVersion: operator.conduit.io/v1alpha kind: Conduit metadata: name: conduit-generator @@ -13,8 +13,14 @@ spec: settings: - name: format.type value: structured - - name: format.options - value: "id:int,name:string,company:string,trial:bool" + - name: format.options.id + value: "int" + - name: format.options.name + value: "string" + - name: format.options.company + value: "string" + - name: format.options.trial + value: "bool" - name: recordCount value: "3" - name: destination-connector diff --git a/config/samples/conduit-with-proccessors.yaml b/config/samples/conduit-with-proccessors.yaml index 5cd25d9..5cd2c07 100644 --- a/config/samples/conduit-with-proccessors.yaml +++ b/config/samples/conduit-with-proccessors.yaml @@ -1,4 +1,4 @@ -apiVersion: operator.conduit.io/v1 +apiVersion: operator.conduit.io/v1alpha kind: Conduit metadata: name: conduit-with-procs @@ -13,17 +13,25 @@ spec: settings: - name: format.type value: structured - - name: format.options - value: "id:int,name:string,company:string,password:string,trial:bool" + - name: format.options.id + value: "int" + - name: format.options.name + value: "string" + - name: format.options.company + value: "string" + - name: format.options.password + value: "string" + - name: format.options.trial + value: "bool" - name: recordCount value: "3" processors: - name: sourceproc - type: maskfieldpayload + plugin: field.set settings: - name: field - value: password - - name: replacement + value: .Payload.After.password + - name: value value: "[redacted]" - name: destination-connector type: destination diff --git a/config/samples/invalid-plugin.yaml b/config/samples/invalid-plugin.yaml index 48c1998..365bb34 100644 --- a/config/samples/invalid-plugin.yaml +++ b/config/samples/invalid-plugin.yaml @@ -1,4 +1,4 @@ -apiVersion: operator.conduit.io/v1 +apiVersion: operator.conduit.io/v1alpha kind: Conduit metadata: name: conduit-sample @@ -9,7 +9,7 @@ spec: connectors: - name: source-connector type: source - plugin: foobar + plugin: foobar settings: - name: path secretRef: diff --git a/controllers/conduit_controller.go b/controllers/conduit_controller.go index d237c5b..0bdb069 100644 --- a/controllers/conduit_controller.go +++ b/controllers/conduit_controller.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "runtime" + "slices" "time" "golang.org/x/exp/maps" @@ -231,9 +232,9 @@ func (r *ConduitReconciler) CreateOrUpdateVolume(ctx context.Context, c *v1.Cond // Status conditions are set depending on the outcome of the operation. func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1.Conduit) error { var ( - cm = corev1.ConfigMap{} - nn = c.NamespacedName() - oneReplica = int32(1) + cm = corev1.ConfigMap{} + nn = c.NamespacedName() + replicas = r.getReplicas(c) deployment = appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ @@ -271,7 +272,7 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1. Strategy: appsv1.DeploymentStrategy{ Type: appsv1.RecreateDeploymentStrategyType, }, - Replicas: &oneReplica, + Replicas: &replicas, Selector: &metav1.LabelSelector{}, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{}, @@ -303,41 +304,33 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1. status := deployment.Status readyReplicas := fmt.Sprintf("%d/%d", status.ReadyReplicas, *spec.Replicas) - var availableCond appsv1.DeploymentCondition - for i, c := range status.Conditions { - if c.Type == appsv1.DeploymentAvailable { - availableCond = status.Conditions[i] - break - } - } + runningStatus := r.deploymentRunningStatus(&deployment) - switch status := availableCond.Status; status { + switch runningStatus { case corev1.ConditionTrue: - if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, status) { + if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, runningStatus) { r.Eventf(c, corev1.EventTypeNormal, v1.RunningReason, "Conduit deployment %q running, config %q", nn, cm.ResourceVersion) } c.Status.SetCondition( v1.ConditionConduitDeploymentRunning, - status, + runningStatus, "DeploymentReady", readyReplicas, ) case corev1.ConditionFalse: - if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, status) { + if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, runningStatus) { r.Eventf(c, corev1.EventTypeNormal, v1.StoppedReason, "Conduit deployment %q stopped, config %q", nn, cm.ResourceVersion) } c.Status.SetCondition( v1.ConditionConduitDeploymentRunning, - status, + runningStatus, "DeploymentReady", readyReplicas, ) default: - if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, status) { - r.Eventf(c, corev1.EventTypeNormal, v1.PendingReason, "Conduit deployment %q pending, config %q", nn, cm.ResourceVersion) - } + r.Eventf(c, corev1.EventTypeNormal, v1.PendingReason, "Conduit deployment %q pending, config %q", nn, cm.ResourceVersion) } return ctrlutil.SetControllerReference(c, &deployment, r.Scheme()) @@ -467,3 +460,27 @@ func (r *ConduitReconciler) SetupWithManager(mgr ctrl.Manager) error { // Owns(&corev1.PersistentVolume{}). Complete(r) } + +func (r *ConduitReconciler) getReplicas(c *v1.Conduit) int32 { + if c.Spec.Running != nil && *c.Spec.Running { + return 1 + } + return 0 +} + +func (r *ConduitReconciler) deploymentRunningStatus(d *appsv1.Deployment) corev1.ConditionStatus { + // When the deployment is scaled down, return not running (false) + if *d.Spec.Replicas == 0 { + return corev1.ConditionFalse + } + + i := slices.IndexFunc(d.Status.Conditions, func(c appsv1.DeploymentCondition) bool { + return c.Type == appsv1.DeploymentAvailable + }) + if i < 0 { + r.Logger.Info("failed to find deployment status condition, default to unknown", "deployment", d.Name) + return corev1.ConditionUnknown + } + + return d.Status.Conditions[i].Status +} diff --git a/controllers/conduit_controller_test.go b/controllers/conduit_controller_test.go index 1973241..149104c 100644 --- a/controllers/conduit_controller_test.go +++ b/controllers/conduit_controller_test.go @@ -825,6 +825,65 @@ func Test_CreateOrUpdateDeployment(t *testing.T) { } }, }, + { + name: "deployment is stopped", + conduit: sampleConduit(false), + wantStatus: func() *v1alpha.ConduitStatus { + status := defaultConditions() + status.SetCondition(v1alpha.ConditionConduitDeploymentRunning, corev1.ConditionFalse, "", "") + status.SetCondition(v1alpha.ConditionConduitReady, corev1.ConditionFalse, "", "") + + return status + }(), + setup: func(ctrl *gomock.Controller, c *v1alpha.Conduit) *controllers.ConduitReconciler { + nn := c.NamespacedName() + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: nn.Name, + Namespace: nn.Namespace, + }, + } + + client := mock.NewMockClient(ctrl) + client.EXPECT().Scheme().Return(conduitScheme) + client.EXPECT().Get(ctx, nn, &corev1.ConfigMap{}). + DoAndReturn(func(_ context.Context, _ types.NamespacedName, c *corev1.ConfigMap, _ ...kclient.CreateOption) error { + c.ResourceVersion = resourceVer + return nil + }) + client.EXPECT().Get(ctx, nn, deployment). + DoAndReturn(func(_ context.Context, n types.NamespacedName, d *appsv1.Deployment, _ ...kclient.CreateOption) error { + is.Equal(n, nn) + + d.Status = appsv1.DeploymentStatus{ + Conditions: []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionTrue, + }, + }, + } + + return nil + }) + + updatedDeployment := deployment.DeepCopy() + updatedDeployment.Spec.Template.Annotations = map[string]string{ + "operator.conduit.io/config-map-version": "resource-version-121", + } + client.EXPECT().Update(ctx, mock.NewDeploymentMatcher(updatedDeployment)).Return(nil) + + recorder := mock.NewMockEventRecorder(ctrl) + recorder.EXPECT().Eventf(c, corev1.EventTypeNormal, v1alpha.StoppedReason, gomock.Any(), nn) + recorder.EXPECT().Eventf(c, corev1.EventTypeNormal, v1alpha.UpdatedReason, gomock.Any(), nn) + + return &controllers.ConduitReconciler{ + Metadata: &v1alpha.ConduitInstanceMetadata{}, + Client: client, + EventRecorder: recorder, + } + }, + }, { name: "error when getting config map", conduit: sampleConduit(true),