Skip to content

Commit

Permalink
Add functionality for stopping/starting deployment (#25)
Browse files Browse the repository at this point in the history
* Add functionality for stopping/starting deployment

Co-authored-by: Lyubo Kamenov <[email protected]>
  • Loading branch information
hariso and lyuboxa authored Oct 2, 2024
1 parent 009799d commit 3c12f12
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ CERT_MANAGER := https://github.com/cert-manager/cert-manager/releases/downl
# KUSTOMIZE_VERSION ?= v4.5.7
KUSTOMIZE_VERSION ?= v5.4.3
CTRL_GEN_VERSION ?= v0.16.3
KIND_VERSION ?= v0.22.0
KIND_VERSION ?= v0.24.0
GOLINT_VERSION ?= v1.61.0

.EXPORT_ALL_VARIABLES:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ spec:

3. Deploy a sample pipeline configuration
```shell
kubectl apply -f samples/conf/conduit-generator.yaml
kubectl apply -f config/samples/conduit-generator.yaml
```

4. Wait for instance to become ready
Expand Down
20 changes: 13 additions & 7 deletions config/samples/conduit-generator-image-ver.yaml
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
apiVersion: operator.conduit.io/v1
apiVersion: operator.conduit.io/v1alpha
kind: Conduit
metadata:
name: conduit-generator
name: conduit-generator-image-ver
spec:
running: true
name: generator.log
description: generator pipeline
image: ghcr.io/conduitio/conduit
version: v0.9.0
version: v0.11.1
connectors:
- name: source-connector
type: source
plugin: builtin:generator
plugin: "builtin:generator"
settings:
- name: format.type
value: structured
- name: format.options
value: "id:int,name:string,company:string,trial:bool"
- name: format.options.id
value: "int"
- name: format.options.name
value: "string"
- name: format.options.company
value: "string"
- name: format.options.trial
value: "bool"
- name: recordCount
value: "3"
- name: destination-connector
type: destination
plugin: builtin:log
plugin: "builtin:log"
20 changes: 14 additions & 6 deletions config/samples/conduit-generator-secrets.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
apiVersion: v1
data:
type: c3RydWN0dXJlZAo=
options: aWQ6aW50LG5hbWU6c3RyaW5nLGNvbXBhbnk6c3RyaW5nLHRyaWFsOmJvb2wK
password-type: c3RyaW5n
kind: Secret
metadata:
name: conduit-generator-secrets-format
type: Opaque
---
apiVersion: operator.conduit.io/v1
apiVersion: operator.conduit.io/v1alpha
kind: Conduit
metadata:
name: conduit-generator-secrets
Expand All @@ -19,19 +19,27 @@ spec:
- name: source-connector
type: source
plugin: conduitio/conduit-connector-generator
pluginVersion: v0.5.0
pluginVersion: v0.7.0
settings:
- name: format.type
secretRef:
key: type
name: conduit-generator-secrets-format
- name: format.options
- name: format.options.id
value: "int"
- name: format.options.name
value: "string"
- name: format.options.company
value: "string"
- name: format.options.trial
value: "bool"
- name: format.options.password
secretRef:
key: options
key: password-type
name: conduit-generator-secrets-format
- name: recordCount
value: "3"
- name: destination-connector
type: destination
plugin: conduitio/conduit-connector-log
pluginVersion: v0.3.0
pluginVersion: v0.4.0
12 changes: 9 additions & 3 deletions config/samples/conduit-generator.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: operator.conduit.io/v1
apiVersion: operator.conduit.io/v1alpha
kind: Conduit
metadata:
name: conduit-generator
Expand All @@ -13,8 +13,14 @@ spec:
settings:
- name: format.type
value: structured
- name: format.options
value: "id:int,name:string,company:string,trial:bool"
- name: format.options.id
value: "int"
- name: format.options.name
value: "string"
- name: format.options.company
value: "string"
- name: format.options.trial
value: "bool"
- name: recordCount
value: "3"
- name: destination-connector
Expand Down
20 changes: 14 additions & 6 deletions config/samples/conduit-with-proccessors.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: operator.conduit.io/v1
apiVersion: operator.conduit.io/v1alpha
kind: Conduit
metadata:
name: conduit-with-procs
Expand All @@ -13,17 +13,25 @@ spec:
settings:
- name: format.type
value: structured
- name: format.options
value: "id:int,name:string,company:string,password:string,trial:bool"
- name: format.options.id
value: "int"
- name: format.options.name
value: "string"
- name: format.options.company
value: "string"
- name: format.options.password
value: "string"
- name: format.options.trial
value: "bool"
- name: recordCount
value: "3"
processors:
- name: sourceproc
type: maskfieldpayload
plugin: field.set
settings:
- name: field
value: password
- name: replacement
value: .Payload.After.password
- name: value
value: "[redacted]"
- name: destination-connector
type: destination
Expand Down
4 changes: 2 additions & 2 deletions config/samples/invalid-plugin.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
apiVersion: operator.conduit.io/v1
apiVersion: operator.conduit.io/v1alpha
kind: Conduit
metadata:
name: conduit-sample
Expand All @@ -9,7 +9,7 @@ spec:
connectors:
- name: source-connector
type: source
plugin: foobar
plugin: foobar
settings:
- name: path
secretRef:
Expand Down
55 changes: 36 additions & 19 deletions controllers/conduit_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"runtime"
"slices"
"time"

"golang.org/x/exp/maps"
Expand Down Expand Up @@ -231,9 +232,9 @@ func (r *ConduitReconciler) CreateOrUpdateVolume(ctx context.Context, c *v1.Cond
// Status conditions are set depending on the outcome of the operation.
func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1.Conduit) error {
var (
cm = corev1.ConfigMap{}
nn = c.NamespacedName()
oneReplica = int32(1)
cm = corev1.ConfigMap{}
nn = c.NamespacedName()
replicas = r.getReplicas(c)

deployment = appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -271,7 +272,7 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1.
Strategy: appsv1.DeploymentStrategy{
Type: appsv1.RecreateDeploymentStrategyType,
},
Replicas: &oneReplica,
Replicas: &replicas,
Selector: &metav1.LabelSelector{},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Expand Down Expand Up @@ -303,41 +304,33 @@ func (r *ConduitReconciler) CreateOrUpdateDeployment(ctx context.Context, c *v1.
status := deployment.Status
readyReplicas := fmt.Sprintf("%d/%d", status.ReadyReplicas, *spec.Replicas)

var availableCond appsv1.DeploymentCondition
for i, c := range status.Conditions {
if c.Type == appsv1.DeploymentAvailable {
availableCond = status.Conditions[i]
break
}
}
runningStatus := r.deploymentRunningStatus(&deployment)

switch status := availableCond.Status; status {
switch runningStatus {
case corev1.ConditionTrue:
if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, status) {
if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, runningStatus) {
r.Eventf(c, corev1.EventTypeNormal, v1.RunningReason, "Conduit deployment %q running, config %q", nn, cm.ResourceVersion)
}

c.Status.SetCondition(
v1.ConditionConduitDeploymentRunning,
status,
runningStatus,
"DeploymentReady",
readyReplicas,
)
case corev1.ConditionFalse:
if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, status) {
if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, runningStatus) {
r.Eventf(c, corev1.EventTypeNormal, v1.StoppedReason, "Conduit deployment %q stopped, config %q", nn, cm.ResourceVersion)
}

c.Status.SetCondition(
v1.ConditionConduitDeploymentRunning,
status,
runningStatus,
"DeploymentReady",
readyReplicas,
)
default:
if c.Status.ConditionChanged(v1.ConditionConduitDeploymentRunning, status) {
r.Eventf(c, corev1.EventTypeNormal, v1.PendingReason, "Conduit deployment %q pending, config %q", nn, cm.ResourceVersion)
}
r.Eventf(c, corev1.EventTypeNormal, v1.PendingReason, "Conduit deployment %q pending, config %q", nn, cm.ResourceVersion)
}

return ctrlutil.SetControllerReference(c, &deployment, r.Scheme())
Expand Down Expand Up @@ -467,3 +460,27 @@ func (r *ConduitReconciler) SetupWithManager(mgr ctrl.Manager) error {
// Owns(&corev1.PersistentVolume{}).
Complete(r)
}

func (r *ConduitReconciler) getReplicas(c *v1.Conduit) int32 {
if c.Spec.Running != nil && *c.Spec.Running {
return 1
}
return 0
}

func (r *ConduitReconciler) deploymentRunningStatus(d *appsv1.Deployment) corev1.ConditionStatus {
// When the deployment is scaled down, return not running (false)
if *d.Spec.Replicas == 0 {
return corev1.ConditionFalse
}

i := slices.IndexFunc(d.Status.Conditions, func(c appsv1.DeploymentCondition) bool {
return c.Type == appsv1.DeploymentAvailable
})
if i < 0 {
r.Logger.Info("failed to find deployment status condition, default to unknown", "deployment", d.Name)
return corev1.ConditionUnknown
}

return d.Status.Conditions[i].Status
}
59 changes: 59 additions & 0 deletions controllers/conduit_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,65 @@ func Test_CreateOrUpdateDeployment(t *testing.T) {
}
},
},
{
name: "deployment is stopped",
conduit: sampleConduit(false),
wantStatus: func() *v1alpha.ConduitStatus {
status := defaultConditions()
status.SetCondition(v1alpha.ConditionConduitDeploymentRunning, corev1.ConditionFalse, "", "")
status.SetCondition(v1alpha.ConditionConduitReady, corev1.ConditionFalse, "", "")

return status
}(),
setup: func(ctrl *gomock.Controller, c *v1alpha.Conduit) *controllers.ConduitReconciler {
nn := c.NamespacedName()
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: nn.Name,
Namespace: nn.Namespace,
},
}

client := mock.NewMockClient(ctrl)
client.EXPECT().Scheme().Return(conduitScheme)
client.EXPECT().Get(ctx, nn, &corev1.ConfigMap{}).
DoAndReturn(func(_ context.Context, _ types.NamespacedName, c *corev1.ConfigMap, _ ...kclient.CreateOption) error {
c.ResourceVersion = resourceVer
return nil
})
client.EXPECT().Get(ctx, nn, deployment).
DoAndReturn(func(_ context.Context, n types.NamespacedName, d *appsv1.Deployment, _ ...kclient.CreateOption) error {
is.Equal(n, nn)

d.Status = appsv1.DeploymentStatus{
Conditions: []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionTrue,
},
},
}

return nil
})

updatedDeployment := deployment.DeepCopy()
updatedDeployment.Spec.Template.Annotations = map[string]string{
"operator.conduit.io/config-map-version": "resource-version-121",
}
client.EXPECT().Update(ctx, mock.NewDeploymentMatcher(updatedDeployment)).Return(nil)

recorder := mock.NewMockEventRecorder(ctrl)
recorder.EXPECT().Eventf(c, corev1.EventTypeNormal, v1alpha.StoppedReason, gomock.Any(), nn)
recorder.EXPECT().Eventf(c, corev1.EventTypeNormal, v1alpha.UpdatedReason, gomock.Any(), nn)

return &controllers.ConduitReconciler{
Metadata: &v1alpha.ConduitInstanceMetadata{},
Client: client,
EventRecorder: recorder,
}
},
},
{
name: "error when getting config map",
conduit: sampleConduit(true),
Expand Down

0 comments on commit 3c12f12

Please sign in to comment.