Skip to content

Commit

Permalink
Add support for v0.9.0 (#9)
Browse files Browse the repository at this point in the history
* Add support for v0.9.0

- Create processors directory when version is >= 0.9.0
- When version cannot be detected (i.e. not semver), processors path will not be added.
- Allow for different conduit image to be passed.

* fix typo
  • Loading branch information
lyuboxa authored Mar 18, 2024
1 parent 80df83f commit 5547c63
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 55 deletions.
54 changes: 29 additions & 25 deletions api/v1/conduit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,47 +38,55 @@ var conduitConditions = NewConditionSet(
)

const (
ConduitVersion = "0.8.0"
ConduitVersion = "v0.9.0"
ConduitImage = "ghcr.io/conduitio/conduit"
ConduitContainerName = "conduit-server"
ConduitPipelinePath = "/conduit.pipelines"
ConduitVolumePath = "/conduit.storage"
ConduitDBPath = "/conduit.storage/db"
ConduitConnectorsPath = "/conduit.storage/connectors"
ConduitProcessorsPath = "/conduit.storage/processors"
ConduitStorageVolumeMount = "conduit-storage"
ConduitPipelineVolumeMount = "conduit-pipelines"
ConduitInitImage = "golang:1.22"
ConduitInitContainerName = "conduit-connector-init"
ConduitInitContainerName = "conduit-init"
)

var ConduitPipelineFile = path.Join(ConduitPipelinePath, "pipeline.yaml")
var (
ConduitPipelineFile = path.Join(ConduitPipelinePath, "pipeline.yaml")
ConduitWithProcessorsVersion = "0.9.0"
)

// ConduitSpec defines the desired state of Conduit
type ConduitSpec struct {
Running bool `json:"running,omitempty"`
Name string `json:"name,omitempty"`
Version string `json:"version,omitempty"`
Description string `json:"description,omitempty"`
Connectors []*ConduitConnector `json:"connectors,omitempty"`
Processors []*ConduitProcessor `json:"processors,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Image string `json:"image,omitempty"`
Running bool `json:"running,omitempty"`
Version string `json:"version,omitempty"`

Connectors []*ConduitConnector `json:"connectors,omitempty"`
Processors []*ConduitProcessor `json:"processors,omitempty"`
}

type ConduitConnector struct {
Name string `json:"name,omitempty"`
Plugin string `json:"plugin,omitempty"`
PluginPkg string `json:"pluginPkg,omitempty"`
PluginName string `json:"pluginName,omitempty"`
PluginVersion string `json:"pluginVersion,omitempty"`
Type string `json:"type,omitempty"`
Settings []SettingsVar `json:"settings,omitempty"`
Processors []*ConduitProcessor `json:"processors,omitempty"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Plugin string `json:"plugin,omitempty"`
PluginName string `json:"pluginName,omitempty"`
PluginPkg string `json:"pluginPkg,omitempty"`
PluginVersion string `json:"pluginVersion,omitempty"`

Settings []SettingsVar `json:"settings,omitempty"`
Processors []*ConduitProcessor `json:"processors,omitempty"`
}

type ConduitProcessor struct {
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Workers int `json:"workers,omitempty"`

Settings []SettingsVar `json:"settings,omitempty"`
Workers int `json:"workers,omitempty"`
}

type GlobalConfigMapRef struct {
Expand Down Expand Up @@ -132,15 +140,11 @@ type Conduit struct {

func (r *Conduit) NamespacedName() types.NamespacedName {
return types.NamespacedName{
Name: fmt.Sprintf("conduit-server-%s", r.Name),
Name: fmt.Sprint("conduit-server-", r.Name),
Namespace: r.Namespace,
}
}

func (r *Conduit) ImageName() string {
return fmt.Sprintf("%s:v%s", ConduitImage, r.Spec.Version)
}

//+kubebuilder:object:root=true

// ConduitList contains a list of Conduit
Expand Down
4 changes: 4 additions & 0 deletions api/v1/conduit_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (r *Conduit) Default() {
}
}

if r.Spec.Image == "" {
r.Spec.Image = ConduitImage
}

if r.Spec.Version == "" {
r.Spec.Version = ConduitVersion
}
Expand Down
2 changes: 2 additions & 0 deletions charts/conduit-operator/templates/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ spec:
type: array
description:
type: string
image:
type: string
name:
type: string
processors:
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/operator.conduit.io_conduits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ spec:
type: array
description:
type: string
image:
type: string
name:
type: string
processors:
Expand Down
24 changes: 24 additions & 0 deletions config/samples/conduit-generator-image-ver.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: operator.conduit.io/v1
kind: Conduit
metadata:
name: conduit-generator
spec:
running: true
name: generator.log
description: generator pipeline
image: ghcr.io/conduitio/conduit
version: v0.9.0
connectors:
- name: source-connector
type: source
plugin: builtin:generator
settings:
- name: format.type
value: structured
- name: format.options
value: "id:int,name:string,company:string,trial:bool"
- name: recordCount
value: "3"
- name: destination-connector
type: destination
plugin: builtin:log
2 changes: 1 addition & 1 deletion config/samples/conduit-generator-secrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ metadata:
spec:
running: true
name: generator.standalone.log
description: generator pipline
description: generator pipeline
connectors:
- name: source-connector
type: source
Expand Down
2 changes: 1 addition & 1 deletion config/samples/conduit-generator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
spec:
running: true
name: generator.log
description: generator pipline
description: generator pipeline
connectors:
- name: source-connector
type: source
Expand Down
2 changes: 1 addition & 1 deletion config/samples/conduit-with-proccessors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
spec:
running: true
name: generator.proc.log
description: redacting generator pipline
description: redacting generator pipeline
connectors:
- name: source-connector
type: source
Expand Down
2 changes: 1 addition & 1 deletion config/samples/invalid-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metadata:
spec:
running: true
name: file-pipeline
description: test pipline
description: test pipeline
connectors:
- name: source-connector
type: source
Expand Down
58 changes: 40 additions & 18 deletions controllers/conduit_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,24 @@ func (c *commandBuilder) addConnectorBuild(b connectorBuild) {
func ConduitInitContainers(cc []*v1.ConduitConnector) []corev1.Container {
builder := &commandBuilder{}

containers := []corev1.Container{
{
Name: v1.ConduitInitContainerName,
Image: v1.ConduitInitImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Args: []string{
"sh", "-xe", "-c",
fmt.Sprintf("mkdir -p %s %s", v1.ConduitProcessorsPath, v1.ConduitConnectorsPath),
},
VolumeMounts: []corev1.VolumeMount{
{
Name: v1.ConduitStorageVolumeMount,
MountPath: v1.ConduitVolumePath,
},
},
},
}

for _, c := range cc {
if strings.HasPrefix(c.Plugin, "builtin") {
continue
Expand All @@ -90,13 +108,9 @@ func ConduitInitContainers(cc []*v1.ConduitConnector) []corev1.Container {
})
}

if builder.empty() {
return []corev1.Container{}
}

return []corev1.Container{
{
Name: v1.ConduitInitContainerName,
if !builder.empty() {
containers = append(containers, corev1.Container{
Name: fmt.Sprint(v1.ConduitInitContainerName, "-connectors"),
Image: v1.ConduitInitImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Args: []string{
Expand All @@ -109,25 +123,33 @@ func ConduitInitContainers(cc []*v1.ConduitConnector) []corev1.Container {
MountPath: v1.ConduitVolumePath,
},
},
},
})
}

return containers
}

// ConduitRuntimeContainer returns a Kubernetes container definition
// todo is the pipelineName supposed to be used?
func ConduitRuntimeContainer(image string, envVars []corev1.EnvVar) corev1.Container {
func ConduitRuntimeContainer(image, version string, envVars []corev1.EnvVar) corev1.Container {
args := []string{
"/app/conduit",
"-pipelines.path", v1.ConduitPipelineFile,
"-connectors.path", v1.ConduitConnectorsPath,
"-db.type", "badger",
"-db.badger.path", v1.ConduitDBPath,
"-pipelines.exit-on-error",
}

if withProcessors(version) {
args = append(args, "-processors.path", v1.ConduitProcessorsPath)
}

return corev1.Container{
Name: v1.ConduitContainerName,
Image: image,
Image: fmt.Sprint(image, ":", version),
ImagePullPolicy: corev1.PullAlways,
Args: []string{
"/app/conduit",
"-pipelines.path", v1.ConduitPipelineFile,
"-connectors.path", v1.ConduitConnectorsPath,
"-db.type", "badger",
"-db.badger.path", v1.ConduitDBPath,
"-pipelines.exit-on-error",
},
Args: args,
Ports: []corev1.ContainerPort{
{
Name: "http",
Expand Down
Loading

0 comments on commit 5547c63

Please sign in to comment.