From 1cf0a29f7be99d35ae45de84f00b9f8b0a6b0c13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Fri, 20 Dec 2024 16:20:37 +0100 Subject: [PATCH] feat: add conduit run (#2038) * wip * use latest ecdysis * fix merge * fix go mod * fix lint --- cmd/conduit/root/{ => config}/config.go | 11 +- cmd/conduit/root/{ => config}/config_test.go | 2 +- cmd/conduit/root/{ => initialize}/init.go | 10 +- cmd/conduit/root/root.go | 68 +++--------- cmd/conduit/root/root_test.go | 27 ----- cmd/conduit/root/run/run.go | 103 +++++++++++++++++++ cmd/conduit/root/run/run_test.go | 82 +++++++++++++++ go.mod | 2 +- go.sum | 4 +- 9 files changed, 214 insertions(+), 95 deletions(-) rename cmd/conduit/root/{ => config}/config.go (92%) rename cmd/conduit/root/{ => config}/config_test.go (99%) rename cmd/conduit/root/{ => initialize}/init.go (94%) create mode 100644 cmd/conduit/root/run/run.go create mode 100644 cmd/conduit/root/run/run_test.go diff --git a/cmd/conduit/root/config.go b/cmd/conduit/root/config/config.go similarity index 92% rename from cmd/conduit/root/config.go rename to cmd/conduit/root/config/config.go index c447e39b4..52314259e 100644 --- a/cmd/conduit/root/config.go +++ b/cmd/conduit/root/config/config.go @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package root +package config import ( "context" "fmt" "reflect" + "github.com/conduitio/conduit/cmd/conduit/root/run" "github.com/conduitio/ecdysis" ) @@ -30,15 +31,15 @@ var ( ) type ConfigCommand struct { - rootCmd *RootCommand + RunCmd *run.RunCommand } func (c *ConfigCommand) Config() ecdysis.Config { - return c.rootCmd.Config() + return c.RunCmd.Config() } func (c *ConfigCommand) Flags() []ecdysis.Flag { - return c.rootCmd.Flags() + return c.RunCmd.Flags() } func (c *ConfigCommand) Docs() ecdysis.Docs { @@ -83,6 +84,6 @@ func printStruct(v reflect.Value, parentPath string) { func (c *ConfigCommand) Usage() string { return "config" } func (c ConfigCommand) Execute(_ context.Context) error { - printStruct(reflect.ValueOf(c.rootCmd.cfg), "") + printStruct(reflect.ValueOf(c.RunCmd.Cfg), "") return nil } diff --git a/cmd/conduit/root/config_test.go b/cmd/conduit/root/config/config_test.go similarity index 99% rename from cmd/conduit/root/config_test.go rename to cmd/conduit/root/config/config_test.go index 54ca03928..bd410949b 100644 --- a/cmd/conduit/root/config_test.go +++ b/cmd/conduit/root/config/config_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package root +package config import ( "bytes" diff --git a/cmd/conduit/root/init.go b/cmd/conduit/root/initialize/init.go similarity index 94% rename from cmd/conduit/root/init.go rename to cmd/conduit/root/initialize/init.go index 697a2af34..6e0191448 100644 --- a/cmd/conduit/root/init.go +++ b/cmd/conduit/root/initialize/init.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package root +package initialize import ( "bytes" @@ -41,12 +41,12 @@ type InitFlags struct { type InitCommand struct { flags InitFlags - cfg *conduit.Config + Cfg *conduit.Config } func (c *InitCommand) Flags() []ecdysis.Flag { flags := ecdysis.BuildFlags(&c.flags) - flags.SetDefault("path", filepath.Dir(c.cfg.ConduitCfgPath)) + flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath)) return flags } @@ -83,7 +83,7 @@ func (c *InitCommand) createDirs() error { func (c *InitCommand) createConfigYAML() error { cfgYAML := internal.NewYAMLTree() - processConfigStruct(reflect.ValueOf(c.cfg).Elem(), "", cfgYAML) + processConfigStruct(reflect.ValueOf(c.Cfg).Elem(), "", cfgYAML) // Create encoder with custom indentation var buf bytes.Buffer @@ -139,7 +139,7 @@ func processConfigStruct(v reflect.Value, parentPath string, cfgYAML *internal.Y } } -func (c *InitCommand) Execute(ctx context.Context) error { +func (c *InitCommand) Execute(_ context.Context) error { err := c.createDirs() if err != nil { return err diff --git a/cmd/conduit/root/root.go b/cmd/conduit/root/root.go index 653023604..d3cd49a06 100644 --- a/cmd/conduit/root/root.go +++ b/cmd/conduit/root/root.go @@ -18,12 +18,13 @@ import ( "context" "fmt" "os" - "path/filepath" + "github.com/conduitio/conduit/cmd/conduit/root/config" + "github.com/conduitio/conduit/cmd/conduit/root/initialize" "github.com/conduitio/conduit/cmd/conduit/root/pipelines" + "github.com/conduitio/conduit/cmd/conduit/root/run" "github.com/conduitio/conduit/cmd/conduit/root/version" "github.com/conduitio/conduit/pkg/conduit" - "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/ecdysis" ) @@ -32,77 +33,33 @@ var ( _ ecdysis.CommandWithExecute = (*RootCommand)(nil) _ ecdysis.CommandWithDocs = (*RootCommand)(nil) _ ecdysis.CommandWithSubCommands = (*RootCommand)(nil) - _ ecdysis.CommandWithConfig = (*RootCommand)(nil) ) type RootFlags struct { Version bool `long:"version" short:"v" usage:"show the current Conduit version"` - conduit.Config } type RootCommand struct { flags RootFlags - cfg conduit.Config } -func (c *RootCommand) Execute(_ context.Context) error { +func (c *RootCommand) Execute(ctx context.Context) error { if c.flags.Version { _, _ = fmt.Fprintf(os.Stdout, "%s\n", conduit.Version(true)) return nil } - e := &conduit.Entrypoint{} - e.Serve(c.cfg) - return nil -} - -func (c *RootCommand) Config() ecdysis.Config { - path := filepath.Dir(c.flags.ConduitCfgPath) - - return ecdysis.Config{ - EnvPrefix: "CONDUIT", - Parsed: &c.cfg, - Path: c.flags.ConduitCfgPath, - DefaultValues: conduit.DefaultConfigWithBasePath(path), + if cmd := ecdysis.CobraCmdFromContext(ctx); cmd != nil { + return cmd.Help() } + + return nil } func (c *RootCommand) Usage() string { return "conduit" } func (c *RootCommand) Flags() []ecdysis.Flag { - flags := ecdysis.BuildFlags(&c.flags) - - currentPath, err := os.Getwd() - if err != nil { - panic(cerrors.Errorf("failed to get current working directory: %w", err)) - } - - c.cfg = conduit.DefaultConfigWithBasePath(currentPath) - flags.SetDefault("config.path", c.cfg.ConduitCfgPath) - flags.SetDefault("db.type", c.cfg.DB.Type) - flags.SetDefault("db.badger.path", c.cfg.DB.Badger.Path) - flags.SetDefault("db.postgres.connection-string", c.cfg.DB.Postgres.ConnectionString) - flags.SetDefault("db.postgres.table", c.cfg.DB.Postgres.Table) - flags.SetDefault("db.sqlite.path", c.cfg.DB.SQLite.Path) - flags.SetDefault("db.sqlite.table", c.cfg.DB.SQLite.Table) - flags.SetDefault("api.enabled", c.cfg.API.Enabled) - flags.SetDefault("http.address", c.cfg.API.HTTP.Address) - flags.SetDefault("grpc.address", c.cfg.API.GRPC.Address) - flags.SetDefault("log.level", c.cfg.Log.Level) - flags.SetDefault("log.format", c.cfg.Log.Format) - flags.SetDefault("connectors.path", c.cfg.Connectors.Path) - flags.SetDefault("processors.path", c.cfg.Processors.Path) - flags.SetDefault("pipelines.path", c.cfg.Pipelines.Path) - flags.SetDefault("pipelines.exit-on-degraded", c.cfg.Pipelines.ExitOnDegraded) - flags.SetDefault("pipelines.error-recovery.min-delay", c.cfg.Pipelines.ErrorRecovery.MinDelay) - flags.SetDefault("pipelines.error-recovery.max-delay", c.cfg.Pipelines.ErrorRecovery.MaxDelay) - flags.SetDefault("pipelines.error-recovery.backoff-factor", c.cfg.Pipelines.ErrorRecovery.BackoffFactor) - flags.SetDefault("pipelines.error-recovery.max-retries", c.cfg.Pipelines.ErrorRecovery.MaxRetries) - flags.SetDefault("pipelines.error-recovery.max-retries-window", c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow) - flags.SetDefault("schema-registry.type", c.cfg.SchemaRegistry.Type) - flags.SetDefault("schema-registry.confluent.connection-string", c.cfg.SchemaRegistry.Confluent.ConnectionString) - flags.SetDefault("preview.pipeline-arch-v2", c.cfg.Preview.PipelineArchV2) - return flags + return ecdysis.BuildFlags(&c.flags) } func (c *RootCommand) Docs() ecdysis.Docs { @@ -113,10 +70,13 @@ func (c *RootCommand) Docs() ecdysis.Docs { } func (c *RootCommand) SubCommands() []ecdysis.Command { + runCmd := &run.RunCommand{} + return []ecdysis.Command{ - &ConfigCommand{rootCmd: c}, - &InitCommand{cfg: &c.cfg}, + &config.ConfigCommand{RunCmd: runCmd}, + &initialize.InitCommand{Cfg: &runCmd.Cfg}, &version.VersionCommand{}, &pipelines.PipelinesCommand{}, + &run.RunCommand{}, } } diff --git a/cmd/conduit/root/root_test.go b/cmd/conduit/root/root_test.go index fb475ec3f..7188667ea 100644 --- a/cmd/conduit/root/root_test.go +++ b/cmd/conduit/root/root_test.go @@ -31,34 +31,7 @@ func TestRootCommandFlags(t *testing.T) { usage string persistent bool }{ - {longName: "config.path", usage: "global conduit configuration file"}, {longName: "version", shortName: "v", usage: "show the current Conduit version"}, - {longName: "db.type", usage: "database type; accepts badger,postgres,inmemory,sqlite"}, - {longName: "db.badger.path", usage: "path to badger DB"}, - {longName: "db.postgres.connection-string", usage: "postgres connection string, may be a database URL or in PostgreSQL keyword/value format"}, - {longName: "db.postgres.table", usage: "postgres table in which to store data (will be created if it does not exist)"}, - {longName: "db.sqlite.path", usage: "path to sqlite3 DB"}, - {longName: "db.sqlite.table", usage: "sqlite3 table in which to store data (will be created if it does not exist)"}, - {longName: "api.enabled", usage: "enable HTTP and gRPC API"}, - {longName: "http.address", usage: "address for serving the HTTP API"}, - {longName: "grpc.address", usage: "address for serving the gRPC API"}, - {longName: "log.level", usage: "sets logging level; accepts debug, info, warn, error, trace"}, - {longName: "log.format", usage: "sets the format of the logging; accepts json, cli"}, - {longName: "connectors.path", usage: "path to standalone connectors' directory"}, - {longName: "processors.path", usage: "path to standalone processors' directory"}, - {longName: "pipelines.path", usage: "path to pipelines' directory"}, - {longName: "pipelines.exit-on-degraded", usage: "exit Conduit if a pipeline is degraded"}, - {longName: "pipelines.error-recovery.min-delay", usage: "minimum delay before restart"}, - {longName: "pipelines.error-recovery.max-delay", usage: "maximum delay before restart"}, - {longName: "pipelines.error-recovery.backoff-factor", usage: "backoff factor applied to the last delay"}, - {longName: "pipelines.error-recovery.max-retries", usage: "maximum number of retries"}, - {longName: "pipelines.error-recovery.max-retries-window", usage: "amount of time running without any errors after which a pipeline is considered healthy"}, - {longName: "schema-registry.type", usage: "schema registry type; accepts builtin,confluent"}, - {longName: "schema-registry.confluent.connection-string", usage: "confluent schema registry connection string"}, - {longName: "preview.pipeline-arch-v2", usage: "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"}, - {longName: "dev.cpuprofile", usage: "write CPU profile to file"}, - {longName: "dev.memprofile", usage: "write memory profile to file"}, - {longName: "dev.blockprofile", usage: "write block profile to file"}, } e := ecdysis.New() diff --git a/cmd/conduit/root/run/run.go b/cmd/conduit/root/run/run.go new file mode 100644 index 000000000..4ec8ee877 --- /dev/null +++ b/cmd/conduit/root/run/run.go @@ -0,0 +1,103 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package run + +import ( + "context" + "os" + "path/filepath" + + "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/ecdysis" +) + +var ( + _ ecdysis.CommandWithFlags = (*RunCommand)(nil) + _ ecdysis.CommandWithExecute = (*RunCommand)(nil) + _ ecdysis.CommandWithDocs = (*RunCommand)(nil) + _ ecdysis.CommandWithConfig = (*RunCommand)(nil) +) + +type RunFlags struct { + conduit.Config +} + +type RunCommand struct { + flags RunFlags + Cfg conduit.Config +} + +func (c *RunCommand) Execute(_ context.Context) error { + e := &conduit.Entrypoint{} + e.Serve(c.Cfg) + return nil +} + +func (c *RunCommand) Config() ecdysis.Config { + path := filepath.Dir(c.flags.ConduitCfgPath) + + return ecdysis.Config{ + EnvPrefix: "CONDUIT", + Parsed: &c.Cfg, + Path: c.flags.ConduitCfgPath, + DefaultValues: conduit.DefaultConfigWithBasePath(path), + } +} + +func (c *RunCommand) Usage() string { return "run" } + +func (c *RunCommand) Flags() []ecdysis.Flag { + flags := ecdysis.BuildFlags(&c.flags) + + currentPath, err := os.Getwd() + if err != nil { + panic(cerrors.Errorf("failed to get current working directory: %w", err)) + } + + c.Cfg = conduit.DefaultConfigWithBasePath(currentPath) + flags.SetDefault("config.path", c.Cfg.ConduitCfgPath) + flags.SetDefault("db.type", c.Cfg.DB.Type) + flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path) + flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString) + flags.SetDefault("db.postgres.table", c.Cfg.DB.Postgres.Table) + flags.SetDefault("db.sqlite.path", c.Cfg.DB.SQLite.Path) + flags.SetDefault("db.sqlite.table", c.Cfg.DB.SQLite.Table) + flags.SetDefault("api.enabled", c.Cfg.API.Enabled) + flags.SetDefault("http.address", c.Cfg.API.HTTP.Address) + flags.SetDefault("grpc.address", c.Cfg.API.GRPC.Address) + flags.SetDefault("log.level", c.Cfg.Log.Level) + flags.SetDefault("log.format", c.Cfg.Log.Format) + flags.SetDefault("connectors.path", c.Cfg.Connectors.Path) + flags.SetDefault("processors.path", c.Cfg.Processors.Path) + flags.SetDefault("pipelines.path", c.Cfg.Pipelines.Path) + flags.SetDefault("pipelines.exit-on-degraded", c.Cfg.Pipelines.ExitOnDegraded) + flags.SetDefault("pipelines.error-recovery.min-delay", c.Cfg.Pipelines.ErrorRecovery.MinDelay) + flags.SetDefault("pipelines.error-recovery.max-delay", c.Cfg.Pipelines.ErrorRecovery.MaxDelay) + flags.SetDefault("pipelines.error-recovery.backoff-factor", c.Cfg.Pipelines.ErrorRecovery.BackoffFactor) + flags.SetDefault("pipelines.error-recovery.max-retries", c.Cfg.Pipelines.ErrorRecovery.MaxRetries) + flags.SetDefault("pipelines.error-recovery.max-retries-window", c.Cfg.Pipelines.ErrorRecovery.MaxRetriesWindow) + flags.SetDefault("schema-registry.type", c.Cfg.SchemaRegistry.Type) + flags.SetDefault("schema-registry.confluent.connection-string", c.Cfg.SchemaRegistry.Confluent.ConnectionString) + flags.SetDefault("preview.pipeline-arch-v2", c.Cfg.Preview.PipelineArchV2) + return flags +} + +func (c *RunCommand) Docs() ecdysis.Docs { + return ecdysis.Docs{ + Short: "Run Conduit", + Long: `Starts the Conduit server and runs the configured pipelines.`, + } +} diff --git a/cmd/conduit/root/run/run_test.go b/cmd/conduit/root/run/run_test.go new file mode 100644 index 000000000..09f197617 --- /dev/null +++ b/cmd/conduit/root/run/run_test.go @@ -0,0 +1,82 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package run + +import ( + "testing" + + "github.com/conduitio/ecdysis" + "github.com/matryer/is" + "github.com/spf13/pflag" +) + +func TestRunCommandFlags(t *testing.T) { + is := is.New(t) + + expectedFlags := []struct { + longName string + shortName string + usage string + persistent bool + }{ + {longName: "config.path", usage: "global conduit configuration file"}, + {longName: "db.type", usage: "database type; accepts badger,postgres,inmemory,sqlite"}, + {longName: "db.badger.path", usage: "path to badger DB"}, + {longName: "db.postgres.connection-string", usage: "postgres connection string, may be a database URL or in PostgreSQL keyword/value format"}, + {longName: "db.postgres.table", usage: "postgres table in which to store data (will be created if it does not exist)"}, + {longName: "db.sqlite.path", usage: "path to sqlite3 DB"}, + {longName: "db.sqlite.table", usage: "sqlite3 table in which to store data (will be created if it does not exist)"}, + {longName: "api.enabled", usage: "enable HTTP and gRPC API"}, + {longName: "http.address", usage: "address for serving the HTTP API"}, + {longName: "grpc.address", usage: "address for serving the gRPC API"}, + {longName: "log.level", usage: "sets logging level; accepts debug, info, warn, error, trace"}, + {longName: "log.format", usage: "sets the format of the logging; accepts json, cli"}, + {longName: "connectors.path", usage: "path to standalone connectors' directory"}, + {longName: "processors.path", usage: "path to standalone processors' directory"}, + {longName: "pipelines.path", usage: "path to pipelines' directory"}, + {longName: "pipelines.exit-on-degraded", usage: "exit Conduit if a pipeline is degraded"}, + {longName: "pipelines.error-recovery.min-delay", usage: "minimum delay before restart"}, + {longName: "pipelines.error-recovery.max-delay", usage: "maximum delay before restart"}, + {longName: "pipelines.error-recovery.backoff-factor", usage: "backoff factor applied to the last delay"}, + {longName: "pipelines.error-recovery.max-retries", usage: "maximum number of retries"}, + {longName: "pipelines.error-recovery.max-retries-window", usage: "amount of time running without any errors after which a pipeline is considered healthy"}, + {longName: "schema-registry.type", usage: "schema registry type; accepts builtin,confluent"}, + {longName: "schema-registry.confluent.connection-string", usage: "confluent schema registry connection string"}, + {longName: "preview.pipeline-arch-v2", usage: "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"}, + {longName: "dev.cpuprofile", usage: "write CPU profile to file"}, + {longName: "dev.memprofile", usage: "write memory profile to file"}, + {longName: "dev.blockprofile", usage: "write block profile to file"}, + } + + e := ecdysis.New() + c := e.MustBuildCobraCommand(&RunCommand{}) + + persistentFlags := c.PersistentFlags() + cmdFlags := c.Flags() + + for _, f := range expectedFlags { + var cf *pflag.Flag + + if f.persistent { + cf = persistentFlags.Lookup(f.longName) + } else { + cf = cmdFlags.Lookup(f.longName) + } + is.True(cf != nil) + is.Equal(f.longName, cf.Name) + is.Equal(f.shortName, cf.Shorthand) + is.Equal(cf.Usage, f.usage) + } +} diff --git a/go.mod b/go.mod index 91cdd2392..fb53f16aa 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/conduitio/conduit-connector-sdk v0.12.0 github.com/conduitio/conduit-processor-sdk v0.4.0 github.com/conduitio/conduit-schema-registry v0.2.2 - github.com/conduitio/ecdysis v0.0.3 + github.com/conduitio/ecdysis v0.1.0 github.com/conduitio/yaml/v3 v3.3.0 github.com/dop251/goja v0.0.0-20240806095544-3491d4a58fbe github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c diff --git a/go.sum b/go.sum index b14287add..96740c5d2 100644 --- a/go.sum +++ b/go.sum @@ -242,8 +242,8 @@ github.com/conduitio/conduit-processor-sdk v0.4.0 h1:wF1Fj31aneNixNbW5rJ0/5Q3vwW github.com/conduitio/conduit-processor-sdk v0.4.0/go.mod h1:Jj9ZBTee7nO0XeociDxe9gSvLFN1GbPWP1Aj04DPeZQ= github.com/conduitio/conduit-schema-registry v0.2.2 h1:Q0uL8egRAzJlRV7Ed5nEcqZ1yE/UQeZJad3VmhgTSFE= github.com/conduitio/conduit-schema-registry v0.2.2/go.mod h1:EmT4ylkz15LYddL6qU4wDX52n1Yp0aHvEDRIWOYYzFs= -github.com/conduitio/ecdysis v0.0.3 h1:bX+Uk73ntf3QnV3eKGWulxDswGI2lDvJxVDDugjPjbA= -github.com/conduitio/ecdysis v0.0.3/go.mod h1:k0i+Krn8g63HNFnGNnkf8YWf+Lg0KrHRRQtkE0I4MRE= +github.com/conduitio/ecdysis v0.1.0 h1:BahTI+mqGPR3WyIBytYJjpz2naC8AOetaeUg4IARQfY= +github.com/conduitio/ecdysis v0.1.0/go.mod h1:CRQ3QxTsu/hhFepToP8InsBltZMfGIncbWvUNnNJSC4= github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc= github.com/containerd/cgroups/v3 v3.0.5 h1:44na7Ud+VwyE7LIoJ8JTNQOa549a8543BmzaJHo6Bzo=