diff --git a/pkg/provisioning/config/yaml/v2/config.go b/pkg/provisioning/config/yaml/v2/config.go new file mode 100644 index 000000000..71a026bba --- /dev/null +++ b/pkg/provisioning/config/yaml/v2/config.go @@ -0,0 +1,93 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "github.com/conduitio/conduit/pkg/provisioning/config" +) + +func FromConfig(pipelines []config.Pipeline) Configuration { + c := Configuration{ + Version: LatestVersion, + Pipelines: make([]Pipeline, len(pipelines)), + } + + for i, p := range pipelines { + c.Pipelines[i] = fromPipelineConfig(p) + } + + return c +} + +func fromPipelineConfig(p config.Pipeline) Pipeline { + return Pipeline{ + ID: p.ID, + Status: p.Status, + Name: p.Name, + Description: p.Description, + Connectors: fromConnectorsConfig(p.Connectors), + Processors: fromProcessorsConfig(p.Processors), + DLQ: fromDLQConfig(p.DLQ), + } +} + +func fromConnectorsConfig(cc []config.Connector) []Connector { + if len(cc) == 0 { + return []Connector{} + } + + connectors := make([]Connector, len(cc)) + + for i, c := range cc { + connectors[i] = Connector{ + ID: c.ID, + Type: c.Type, + Plugin: c.Plugin, + Name: c.Name, + Settings: c.Settings, + Processors: fromProcessorsConfig(c.Processors), + } + } + + return connectors +} + +func fromProcessorsConfig(procs []config.Processor) []Processor { + if len(procs) == 0 { + return []Processor{} + } + + processors := make([]Processor, len(procs)) + + for i, proc := range procs { + processors[i] = Processor{ + ID: proc.ID, + Type: proc.Type, + Settings: proc.Settings, + Workers: proc.Workers, + } + } + + return processors +} + +func fromDLQConfig(dlq config.DLQ) DLQ { + return DLQ{ + Plugin: dlq.Plugin, + Settings: dlq.Settings, + WindowSize: dlq.WindowSize, + WindowNackThreshold: dlq.WindowNackThreshold, + } +} diff --git a/pkg/provisioning/config/yaml/v2/config_test.go b/pkg/provisioning/config/yaml/v2/config_test.go new file mode 100644 index 000000000..cf3fdaeb2 --- /dev/null +++ b/pkg/provisioning/config/yaml/v2/config_test.go @@ -0,0 +1,220 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "testing" + + "github.com/conduitio/conduit/pkg/provisioning/config" + "github.com/google/go-cmp/cmp" +) + +func TestConfiguration_FromConfig(t *testing.T) { + pipelines := testPipelineConfigs() + expected := expectedModelConfiguration() + + c := FromConfig(pipelines) + + if diff := cmp.Diff(c, expected); diff != "" { + t.Logf("mismatch (-want +got): %s", diff) + t.Fail() + } +} + +func testPipelineConfigs() []config.Pipeline { + intPtr := func(i int) *int { return &i } + + return []config.Pipeline{ + { + ID: "pipeline1", + Name: "pipeline1", + Status: "running", + Description: "desc1", + Connectors: []config.Connector{ + { + ID: "con1", + Type: "source", + Plugin: "builtin:s3", + Name: "s3-source", + Settings: map[string]string{ + "aws.region": "us-east-1", + "aws.bucket": "my-bucket", + }, + Processors: []config.Processor{ + { + ID: "proc1", + Type: "js", + Settings: map[string]string{ + "additionalProp1": "string", + "additionalProp2": "string", + }, + Workers: 1, + }, + }, + }, + { + ID: "con2", + Type: "destination", + Plugin: "builtin:log", + Name: "log-destination", + }, + }, + Processors: []config.Processor{ + { + ID: "pipeline1proc1", + Type: "js", + Settings: map[string]string{ + "additionalProp1": "string", + "additionalProp2": "string", + }, + Workers: 1, + }, + }, + DLQ: config.DLQ{ + Plugin: "builtin:log", + Settings: map[string]string{ + "level": "error", + "message": "record delivery failed", + }, + WindowSize: intPtr(4), + WindowNackThreshold: intPtr(2), + }, + }, + { + ID: "pipeline2", + Name: "pipeline2", + Status: "stopped", + Description: "desc2", + Connectors: []config.Connector{ + { + ID: "con2", + Type: "destination", + Plugin: "builtin:file", + Name: "file-dest", + Settings: map[string]string{ + "path": "my/path", + }, + Processors: []config.Processor{ + { + ID: "con2proc1", + Type: "hoistfield", + Settings: map[string]string{ + "additionalProp1": "string", + "additionalProp2": "string", + }, + Workers: 1, + }, + }, + }, + }, + }, + } +} + +func expectedModelConfiguration() Configuration { + intPtr := func(i int) *int { return &i } + + return Configuration{ + Version: "2.0", + Pipelines: []Pipeline{ + { + ID: "pipeline1", + Status: "running", + Name: "pipeline1", + Description: "desc1", + Connectors: []Connector{ + { + ID: "con1", + Type: "source", + Plugin: "builtin:s3", + Name: "s3-source", + Settings: map[string]string{ + "aws.bucket": "my-bucket", + "aws.region": "us-east-1", + }, + Processors: []Processor{ + { + ID: "proc1", + Type: "js", + Settings: map[string]string{ + "additionalProp1": "string", + "additionalProp2": "string", + }, + Workers: 1, + }, + }, + }, + { + ID: "con2", + Type: "destination", + Plugin: "builtin:log", + Name: "log-destination", + Processors: []Processor{}, + }, + }, + Processors: []Processor{ + { + ID: "pipeline1proc1", + Type: "js", + Settings: map[string]string{ + "additionalProp1": "string", + "additionalProp2": "string", + }, + Workers: 1, + }, + }, + DLQ: DLQ{ + Plugin: "builtin:log", + Settings: map[string]string{ + "level": "error", + "message": "record delivery failed", + }, + WindowSize: intPtr(4), + WindowNackThreshold: intPtr(2), + }, + }, + { + ID: "pipeline2", + Status: "stopped", + Name: "pipeline2", + Description: "desc2", + Connectors: []Connector{ + { + ID: "con2", + Type: "destination", + Plugin: "builtin:file", + Name: "file-dest", + Settings: map[string]string{ + "path": "my/path", + }, + Processors: []Processor{ + { + ID: "con2proc1", + Type: "hoistfield", + Settings: map[string]string{ + "additionalProp1": "string", + "additionalProp2": "string", + }, + Workers: 1, + }, + }, + }, + }, + Processors: []Processor{}, + DLQ: DLQ{}, + }, + }, + } +}