Skip to content

Commit

Permalink
feat: add conduit run (#2038)
Browse files Browse the repository at this point in the history
* wip

* use latest ecdysis

* fix merge

* fix go mod

* fix lint
  • Loading branch information
raulb authored Dec 20, 2024
1 parent 3268d78 commit 1cf0a29
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 95 deletions.
11 changes: 6 additions & 5 deletions cmd/conduit/root/config.go → cmd/conduit/root/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package root
package config

import (
"context"
"fmt"
"reflect"

"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/ecdysis"
)

Expand All @@ -30,15 +31,15 @@ var (
)

type ConfigCommand struct {
rootCmd *RootCommand
RunCmd *run.RunCommand
}

func (c *ConfigCommand) Config() ecdysis.Config {
return c.rootCmd.Config()
return c.RunCmd.Config()
}

func (c *ConfigCommand) Flags() []ecdysis.Flag {
return c.rootCmd.Flags()
return c.RunCmd.Flags()
}

func (c *ConfigCommand) Docs() ecdysis.Docs {
Expand Down Expand Up @@ -83,6 +84,6 @@ func printStruct(v reflect.Value, parentPath string) {
func (c *ConfigCommand) Usage() string { return "config" }

func (c ConfigCommand) Execute(_ context.Context) error {
printStruct(reflect.ValueOf(c.rootCmd.cfg), "")
printStruct(reflect.ValueOf(c.RunCmd.Cfg), "")
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package root
package config

import (
"bytes"
Expand Down
10 changes: 5 additions & 5 deletions cmd/conduit/root/init.go → cmd/conduit/root/initialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package root
package initialize

import (
"bytes"
Expand Down Expand Up @@ -41,12 +41,12 @@ type InitFlags struct {

type InitCommand struct {
flags InitFlags
cfg *conduit.Config
Cfg *conduit.Config
}

func (c *InitCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)
flags.SetDefault("path", filepath.Dir(c.cfg.ConduitCfgPath))
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath))
return flags
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (c *InitCommand) createDirs() error {

func (c *InitCommand) createConfigYAML() error {
cfgYAML := internal.NewYAMLTree()
processConfigStruct(reflect.ValueOf(c.cfg).Elem(), "", cfgYAML)
processConfigStruct(reflect.ValueOf(c.Cfg).Elem(), "", cfgYAML)

// Create encoder with custom indentation
var buf bytes.Buffer
Expand Down Expand Up @@ -139,7 +139,7 @@ func processConfigStruct(v reflect.Value, parentPath string, cfgYAML *internal.Y
}
}

func (c *InitCommand) Execute(ctx context.Context) error {
func (c *InitCommand) Execute(_ context.Context) error {
err := c.createDirs()
if err != nil {
return err
Expand Down
68 changes: 14 additions & 54 deletions cmd/conduit/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/conduitio/conduit/cmd/conduit/root/config"
"github.com/conduitio/conduit/cmd/conduit/root/initialize"
"github.com/conduitio/conduit/cmd/conduit/root/pipelines"
"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/conduit/cmd/conduit/root/version"
"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/ecdysis"
)

Expand All @@ -32,77 +33,33 @@ var (
_ ecdysis.CommandWithExecute = (*RootCommand)(nil)
_ ecdysis.CommandWithDocs = (*RootCommand)(nil)
_ ecdysis.CommandWithSubCommands = (*RootCommand)(nil)
_ ecdysis.CommandWithConfig = (*RootCommand)(nil)
)

type RootFlags struct {
Version bool `long:"version" short:"v" usage:"show the current Conduit version"`
conduit.Config
}

type RootCommand struct {
flags RootFlags
cfg conduit.Config
}

func (c *RootCommand) Execute(_ context.Context) error {
func (c *RootCommand) Execute(ctx context.Context) error {
if c.flags.Version {
_, _ = fmt.Fprintf(os.Stdout, "%s\n", conduit.Version(true))
return nil
}

e := &conduit.Entrypoint{}
e.Serve(c.cfg)
return nil
}

func (c *RootCommand) Config() ecdysis.Config {
path := filepath.Dir(c.flags.ConduitCfgPath)

return ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &c.cfg,
Path: c.flags.ConduitCfgPath,
DefaultValues: conduit.DefaultConfigWithBasePath(path),
if cmd := ecdysis.CobraCmdFromContext(ctx); cmd != nil {
return cmd.Help()
}

return nil
}

func (c *RootCommand) Usage() string { return "conduit" }

func (c *RootCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)

currentPath, err := os.Getwd()
if err != nil {
panic(cerrors.Errorf("failed to get current working directory: %w", err))
}

c.cfg = conduit.DefaultConfigWithBasePath(currentPath)
flags.SetDefault("config.path", c.cfg.ConduitCfgPath)
flags.SetDefault("db.type", c.cfg.DB.Type)
flags.SetDefault("db.badger.path", c.cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.cfg.DB.Postgres.ConnectionString)
flags.SetDefault("db.postgres.table", c.cfg.DB.Postgres.Table)
flags.SetDefault("db.sqlite.path", c.cfg.DB.SQLite.Path)
flags.SetDefault("db.sqlite.table", c.cfg.DB.SQLite.Table)
flags.SetDefault("api.enabled", c.cfg.API.Enabled)
flags.SetDefault("http.address", c.cfg.API.HTTP.Address)
flags.SetDefault("grpc.address", c.cfg.API.GRPC.Address)
flags.SetDefault("log.level", c.cfg.Log.Level)
flags.SetDefault("log.format", c.cfg.Log.Format)
flags.SetDefault("connectors.path", c.cfg.Connectors.Path)
flags.SetDefault("processors.path", c.cfg.Processors.Path)
flags.SetDefault("pipelines.path", c.cfg.Pipelines.Path)
flags.SetDefault("pipelines.exit-on-degraded", c.cfg.Pipelines.ExitOnDegraded)
flags.SetDefault("pipelines.error-recovery.min-delay", c.cfg.Pipelines.ErrorRecovery.MinDelay)
flags.SetDefault("pipelines.error-recovery.max-delay", c.cfg.Pipelines.ErrorRecovery.MaxDelay)
flags.SetDefault("pipelines.error-recovery.backoff-factor", c.cfg.Pipelines.ErrorRecovery.BackoffFactor)
flags.SetDefault("pipelines.error-recovery.max-retries", c.cfg.Pipelines.ErrorRecovery.MaxRetries)
flags.SetDefault("pipelines.error-recovery.max-retries-window", c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow)
flags.SetDefault("schema-registry.type", c.cfg.SchemaRegistry.Type)
flags.SetDefault("schema-registry.confluent.connection-string", c.cfg.SchemaRegistry.Confluent.ConnectionString)
flags.SetDefault("preview.pipeline-arch-v2", c.cfg.Preview.PipelineArchV2)
return flags
return ecdysis.BuildFlags(&c.flags)
}

func (c *RootCommand) Docs() ecdysis.Docs {
Expand All @@ -113,10 +70,13 @@ func (c *RootCommand) Docs() ecdysis.Docs {
}

func (c *RootCommand) SubCommands() []ecdysis.Command {
runCmd := &run.RunCommand{}

return []ecdysis.Command{
&ConfigCommand{rootCmd: c},
&InitCommand{cfg: &c.cfg},
&config.ConfigCommand{RunCmd: runCmd},
&initialize.InitCommand{Cfg: &runCmd.Cfg},
&version.VersionCommand{},
&pipelines.PipelinesCommand{},
&run.RunCommand{},
}
}
27 changes: 0 additions & 27 deletions cmd/conduit/root/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,7 @@ func TestRootCommandFlags(t *testing.T) {
usage string
persistent bool
}{
{longName: "config.path", usage: "global conduit configuration file"},
{longName: "version", shortName: "v", usage: "show the current Conduit version"},
{longName: "db.type", usage: "database type; accepts badger,postgres,inmemory,sqlite"},
{longName: "db.badger.path", usage: "path to badger DB"},
{longName: "db.postgres.connection-string", usage: "postgres connection string, may be a database URL or in PostgreSQL keyword/value format"},
{longName: "db.postgres.table", usage: "postgres table in which to store data (will be created if it does not exist)"},
{longName: "db.sqlite.path", usage: "path to sqlite3 DB"},
{longName: "db.sqlite.table", usage: "sqlite3 table in which to store data (will be created if it does not exist)"},
{longName: "api.enabled", usage: "enable HTTP and gRPC API"},
{longName: "http.address", usage: "address for serving the HTTP API"},
{longName: "grpc.address", usage: "address for serving the gRPC API"},
{longName: "log.level", usage: "sets logging level; accepts debug, info, warn, error, trace"},
{longName: "log.format", usage: "sets the format of the logging; accepts json, cli"},
{longName: "connectors.path", usage: "path to standalone connectors' directory"},
{longName: "processors.path", usage: "path to standalone processors' directory"},
{longName: "pipelines.path", usage: "path to pipelines' directory"},
{longName: "pipelines.exit-on-degraded", usage: "exit Conduit if a pipeline is degraded"},
{longName: "pipelines.error-recovery.min-delay", usage: "minimum delay before restart"},
{longName: "pipelines.error-recovery.max-delay", usage: "maximum delay before restart"},
{longName: "pipelines.error-recovery.backoff-factor", usage: "backoff factor applied to the last delay"},
{longName: "pipelines.error-recovery.max-retries", usage: "maximum number of retries"},
{longName: "pipelines.error-recovery.max-retries-window", usage: "amount of time running without any errors after which a pipeline is considered healthy"},
{longName: "schema-registry.type", usage: "schema registry type; accepts builtin,confluent"},
{longName: "schema-registry.confluent.connection-string", usage: "confluent schema registry connection string"},
{longName: "preview.pipeline-arch-v2", usage: "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"},
{longName: "dev.cpuprofile", usage: "write CPU profile to file"},
{longName: "dev.memprofile", usage: "write memory profile to file"},
{longName: "dev.blockprofile", usage: "write block profile to file"},
}

e := ecdysis.New()
Expand Down
103 changes: 103 additions & 0 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package run

import (
"context"
"os"
"path/filepath"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/ecdysis"
)

var (
_ ecdysis.CommandWithFlags = (*RunCommand)(nil)
_ ecdysis.CommandWithExecute = (*RunCommand)(nil)
_ ecdysis.CommandWithDocs = (*RunCommand)(nil)
_ ecdysis.CommandWithConfig = (*RunCommand)(nil)
)

type RunFlags struct {
conduit.Config
}

type RunCommand struct {
flags RunFlags
Cfg conduit.Config
}

func (c *RunCommand) Execute(_ context.Context) error {
e := &conduit.Entrypoint{}
e.Serve(c.Cfg)
return nil
}

func (c *RunCommand) Config() ecdysis.Config {
path := filepath.Dir(c.flags.ConduitCfgPath)

return ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &c.Cfg,
Path: c.flags.ConduitCfgPath,
DefaultValues: conduit.DefaultConfigWithBasePath(path),
}
}

func (c *RunCommand) Usage() string { return "run" }

func (c *RunCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)

currentPath, err := os.Getwd()
if err != nil {
panic(cerrors.Errorf("failed to get current working directory: %w", err))
}

c.Cfg = conduit.DefaultConfigWithBasePath(currentPath)
flags.SetDefault("config.path", c.Cfg.ConduitCfgPath)
flags.SetDefault("db.type", c.Cfg.DB.Type)
flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString)
flags.SetDefault("db.postgres.table", c.Cfg.DB.Postgres.Table)
flags.SetDefault("db.sqlite.path", c.Cfg.DB.SQLite.Path)
flags.SetDefault("db.sqlite.table", c.Cfg.DB.SQLite.Table)
flags.SetDefault("api.enabled", c.Cfg.API.Enabled)
flags.SetDefault("http.address", c.Cfg.API.HTTP.Address)
flags.SetDefault("grpc.address", c.Cfg.API.GRPC.Address)
flags.SetDefault("log.level", c.Cfg.Log.Level)
flags.SetDefault("log.format", c.Cfg.Log.Format)
flags.SetDefault("connectors.path", c.Cfg.Connectors.Path)
flags.SetDefault("processors.path", c.Cfg.Processors.Path)
flags.SetDefault("pipelines.path", c.Cfg.Pipelines.Path)
flags.SetDefault("pipelines.exit-on-degraded", c.Cfg.Pipelines.ExitOnDegraded)
flags.SetDefault("pipelines.error-recovery.min-delay", c.Cfg.Pipelines.ErrorRecovery.MinDelay)
flags.SetDefault("pipelines.error-recovery.max-delay", c.Cfg.Pipelines.ErrorRecovery.MaxDelay)
flags.SetDefault("pipelines.error-recovery.backoff-factor", c.Cfg.Pipelines.ErrorRecovery.BackoffFactor)
flags.SetDefault("pipelines.error-recovery.max-retries", c.Cfg.Pipelines.ErrorRecovery.MaxRetries)
flags.SetDefault("pipelines.error-recovery.max-retries-window", c.Cfg.Pipelines.ErrorRecovery.MaxRetriesWindow)
flags.SetDefault("schema-registry.type", c.Cfg.SchemaRegistry.Type)
flags.SetDefault("schema-registry.confluent.connection-string", c.Cfg.SchemaRegistry.Confluent.ConnectionString)
flags.SetDefault("preview.pipeline-arch-v2", c.Cfg.Preview.PipelineArchV2)
return flags
}

func (c *RunCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "Run Conduit",
Long: `Starts the Conduit server and runs the configured pipelines.`,
}
}
Loading

0 comments on commit 1cf0a29

Please sign in to comment.