Skip to content

Commit

Permalink
Convert provision config pipelines into model yaml (#1278)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyuboxa authored Nov 17, 2023
1 parent df048d9 commit abaaf49
Show file tree
Hide file tree
Showing 2 changed files with 313 additions and 0 deletions.
93 changes: 93 additions & 0 deletions pkg/provisioning/config/yaml/v2/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"github.com/conduitio/conduit/pkg/provisioning/config"
)

func FromConfig(pipelines []config.Pipeline) Configuration {
c := Configuration{
Version: LatestVersion,
Pipelines: make([]Pipeline, len(pipelines)),
}

for i, p := range pipelines {
c.Pipelines[i] = fromPipelineConfig(p)
}

return c
}

func fromPipelineConfig(p config.Pipeline) Pipeline {
return Pipeline{
ID: p.ID,
Status: p.Status,
Name: p.Name,
Description: p.Description,
Connectors: fromConnectorsConfig(p.Connectors),
Processors: fromProcessorsConfig(p.Processors),
DLQ: fromDLQConfig(p.DLQ),
}
}

func fromConnectorsConfig(cc []config.Connector) []Connector {
if len(cc) == 0 {
return []Connector{}
}

connectors := make([]Connector, len(cc))

for i, c := range cc {
connectors[i] = Connector{
ID: c.ID,
Type: c.Type,
Plugin: c.Plugin,
Name: c.Name,
Settings: c.Settings,
Processors: fromProcessorsConfig(c.Processors),
}
}

return connectors
}

func fromProcessorsConfig(procs []config.Processor) []Processor {
if len(procs) == 0 {
return []Processor{}
}

processors := make([]Processor, len(procs))

for i, proc := range procs {
processors[i] = Processor{
ID: proc.ID,
Type: proc.Type,
Settings: proc.Settings,
Workers: proc.Workers,
}
}

return processors
}

func fromDLQConfig(dlq config.DLQ) DLQ {
return DLQ{
Plugin: dlq.Plugin,
Settings: dlq.Settings,
WindowSize: dlq.WindowSize,
WindowNackThreshold: dlq.WindowNackThreshold,
}
}
220 changes: 220 additions & 0 deletions pkg/provisioning/config/yaml/v2/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"testing"

"github.com/conduitio/conduit/pkg/provisioning/config"
"github.com/google/go-cmp/cmp"
)

func TestConfiguration_FromConfig(t *testing.T) {
pipelines := testPipelineConfigs()
expected := expectedModelConfiguration()

c := FromConfig(pipelines)

if diff := cmp.Diff(c, expected); diff != "" {
t.Logf("mismatch (-want +got): %s", diff)
t.Fail()
}
}

func testPipelineConfigs() []config.Pipeline {
intPtr := func(i int) *int { return &i }

return []config.Pipeline{
{
ID: "pipeline1",
Name: "pipeline1",
Status: "running",
Description: "desc1",
Connectors: []config.Connector{
{
ID: "con1",
Type: "source",
Plugin: "builtin:s3",
Name: "s3-source",
Settings: map[string]string{
"aws.region": "us-east-1",
"aws.bucket": "my-bucket",
},
Processors: []config.Processor{
{
ID: "proc1",
Type: "js",
Settings: map[string]string{
"additionalProp1": "string",
"additionalProp2": "string",
},
Workers: 1,
},
},
},
{
ID: "con2",
Type: "destination",
Plugin: "builtin:log",
Name: "log-destination",
},
},
Processors: []config.Processor{
{
ID: "pipeline1proc1",
Type: "js",
Settings: map[string]string{
"additionalProp1": "string",
"additionalProp2": "string",
},
Workers: 1,
},
},
DLQ: config.DLQ{
Plugin: "builtin:log",
Settings: map[string]string{
"level": "error",
"message": "record delivery failed",
},
WindowSize: intPtr(4),
WindowNackThreshold: intPtr(2),
},
},
{
ID: "pipeline2",
Name: "pipeline2",
Status: "stopped",
Description: "desc2",
Connectors: []config.Connector{
{
ID: "con2",
Type: "destination",
Plugin: "builtin:file",
Name: "file-dest",
Settings: map[string]string{
"path": "my/path",
},
Processors: []config.Processor{
{
ID: "con2proc1",
Type: "hoistfield",
Settings: map[string]string{
"additionalProp1": "string",
"additionalProp2": "string",
},
Workers: 1,
},
},
},
},
},
}
}

func expectedModelConfiguration() Configuration {
intPtr := func(i int) *int { return &i }

return Configuration{
Version: "2.0",
Pipelines: []Pipeline{
{
ID: "pipeline1",
Status: "running",
Name: "pipeline1",
Description: "desc1",
Connectors: []Connector{
{
ID: "con1",
Type: "source",
Plugin: "builtin:s3",
Name: "s3-source",
Settings: map[string]string{
"aws.bucket": "my-bucket",
"aws.region": "us-east-1",
},
Processors: []Processor{
{
ID: "proc1",
Type: "js",
Settings: map[string]string{
"additionalProp1": "string",
"additionalProp2": "string",
},
Workers: 1,
},
},
},
{
ID: "con2",
Type: "destination",
Plugin: "builtin:log",
Name: "log-destination",
Processors: []Processor{},
},
},
Processors: []Processor{
{
ID: "pipeline1proc1",
Type: "js",
Settings: map[string]string{
"additionalProp1": "string",
"additionalProp2": "string",
},
Workers: 1,
},
},
DLQ: DLQ{
Plugin: "builtin:log",
Settings: map[string]string{
"level": "error",
"message": "record delivery failed",
},
WindowSize: intPtr(4),
WindowNackThreshold: intPtr(2),
},
},
{
ID: "pipeline2",
Status: "stopped",
Name: "pipeline2",
Description: "desc2",
Connectors: []Connector{
{
ID: "con2",
Type: "destination",
Plugin: "builtin:file",
Name: "file-dest",
Settings: map[string]string{
"path": "my/path",
},
Processors: []Processor{
{
ID: "con2proc1",
Type: "hoistfield",
Settings: map[string]string{
"additionalProp1": "string",
"additionalProp2": "string",
},
Workers: 1,
},
},
},
},
Processors: []Processor{},
DLQ: DLQ{},
},
},
}
}

0 comments on commit abaaf49

Please sign in to comment.