Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add conduit run #2038

Merged
merged 8 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package root
package config

import (
"context"
"fmt"
"reflect"

"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/ecdysis"
)

Expand All @@ -30,15 +31,15 @@ var (
)

type ConfigCommand struct {
rootCmd *RootCommand
RunCmd *run.RunCommand
}

func (c *ConfigCommand) Config() ecdysis.Config {
return c.rootCmd.Config()
return c.RunCmd.Config()
}

func (c *ConfigCommand) Flags() []ecdysis.Flag {
return c.rootCmd.Flags()
return c.RunCmd.Flags()
}

func (c *ConfigCommand) Docs() ecdysis.Docs {
Expand Down Expand Up @@ -83,6 +84,6 @@ func printStruct(v reflect.Value, parentPath string) {
func (c *ConfigCommand) Usage() string { return "config" }

func (c ConfigCommand) Execute(_ context.Context) error {
printStruct(reflect.ValueOf(c.rootCmd.cfg), "")
printStruct(reflect.ValueOf(c.RunCmd.Cfg), "")
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package root
package config

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package root
package initialize

import (
"bytes"
Expand Down Expand Up @@ -41,12 +41,12 @@ type InitFlags struct {

type InitCommand struct {
flags InitFlags
cfg *conduit.Config
Cfg *conduit.Config
}

func (c *InitCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)
flags.SetDefault("path", filepath.Dir(c.cfg.ConduitCfgPath))
flags.SetDefault("path", filepath.Dir(c.Cfg.ConduitCfgPath))
return flags
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (c *InitCommand) createDirs() error {

func (c *InitCommand) createConfigYAML() error {
cfgYAML := internal.NewYAMLTree()
processConfigStruct(reflect.ValueOf(c.cfg).Elem(), "", cfgYAML)
processConfigStruct(reflect.ValueOf(c.Cfg).Elem(), "", cfgYAML)

// Create encoder with custom indentation
var buf bytes.Buffer
Expand Down Expand Up @@ -139,7 +139,7 @@ func processConfigStruct(v reflect.Value, parentPath string, cfgYAML *internal.Y
}
}

func (c *InitCommand) Execute(ctx context.Context) error {
func (c *InitCommand) Execute(_ context.Context) error {
err := c.createDirs()
if err != nil {
return err
Expand Down
68 changes: 14 additions & 54 deletions cmd/conduit/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/conduitio/conduit/cmd/conduit/root/config"
"github.com/conduitio/conduit/cmd/conduit/root/initialize"
"github.com/conduitio/conduit/cmd/conduit/root/pipelines"
"github.com/conduitio/conduit/cmd/conduit/root/run"
"github.com/conduitio/conduit/cmd/conduit/root/version"
"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/ecdysis"
)

Expand All @@ -32,77 +33,33 @@ var (
_ ecdysis.CommandWithExecute = (*RootCommand)(nil)
_ ecdysis.CommandWithDocs = (*RootCommand)(nil)
_ ecdysis.CommandWithSubCommands = (*RootCommand)(nil)
_ ecdysis.CommandWithConfig = (*RootCommand)(nil)
)

type RootFlags struct {
Version bool `long:"version" short:"v" usage:"show the current Conduit version"`
conduit.Config
}

type RootCommand struct {
flags RootFlags
cfg conduit.Config
}

func (c *RootCommand) Execute(_ context.Context) error {
func (c *RootCommand) Execute(ctx context.Context) error {
if c.flags.Version {
_, _ = fmt.Fprintf(os.Stdout, "%s\n", conduit.Version(true))
return nil
}

e := &conduit.Entrypoint{}
e.Serve(c.cfg)
return nil
}

func (c *RootCommand) Config() ecdysis.Config {
path := filepath.Dir(c.flags.ConduitCfgPath)

return ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &c.cfg,
Path: c.flags.ConduitCfgPath,
DefaultValues: conduit.DefaultConfigWithBasePath(path),
if cmd := ecdysis.CobraCmdFromContext(ctx); cmd != nil {
return cmd.Help()
}

return nil
}

func (c *RootCommand) Usage() string { return "conduit" }

func (c *RootCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)

currentPath, err := os.Getwd()
if err != nil {
panic(cerrors.Errorf("failed to get current working directory: %w", err))
}

c.cfg = conduit.DefaultConfigWithBasePath(currentPath)
flags.SetDefault("config.path", c.cfg.ConduitCfgPath)
flags.SetDefault("db.type", c.cfg.DB.Type)
flags.SetDefault("db.badger.path", c.cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.cfg.DB.Postgres.ConnectionString)
flags.SetDefault("db.postgres.table", c.cfg.DB.Postgres.Table)
flags.SetDefault("db.sqlite.path", c.cfg.DB.SQLite.Path)
flags.SetDefault("db.sqlite.table", c.cfg.DB.SQLite.Table)
flags.SetDefault("api.enabled", c.cfg.API.Enabled)
flags.SetDefault("http.address", c.cfg.API.HTTP.Address)
flags.SetDefault("grpc.address", c.cfg.API.GRPC.Address)
flags.SetDefault("log.level", c.cfg.Log.Level)
flags.SetDefault("log.format", c.cfg.Log.Format)
flags.SetDefault("connectors.path", c.cfg.Connectors.Path)
flags.SetDefault("processors.path", c.cfg.Processors.Path)
flags.SetDefault("pipelines.path", c.cfg.Pipelines.Path)
flags.SetDefault("pipelines.exit-on-degraded", c.cfg.Pipelines.ExitOnDegraded)
flags.SetDefault("pipelines.error-recovery.min-delay", c.cfg.Pipelines.ErrorRecovery.MinDelay)
flags.SetDefault("pipelines.error-recovery.max-delay", c.cfg.Pipelines.ErrorRecovery.MaxDelay)
flags.SetDefault("pipelines.error-recovery.backoff-factor", c.cfg.Pipelines.ErrorRecovery.BackoffFactor)
flags.SetDefault("pipelines.error-recovery.max-retries", c.cfg.Pipelines.ErrorRecovery.MaxRetries)
flags.SetDefault("pipelines.error-recovery.max-retries-window", c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow)
flags.SetDefault("schema-registry.type", c.cfg.SchemaRegistry.Type)
flags.SetDefault("schema-registry.confluent.connection-string", c.cfg.SchemaRegistry.Confluent.ConnectionString)
flags.SetDefault("preview.pipeline-arch-v2", c.cfg.Preview.PipelineArchV2)
return flags
return ecdysis.BuildFlags(&c.flags)
}

func (c *RootCommand) Docs() ecdysis.Docs {
Expand All @@ -113,10 +70,13 @@ func (c *RootCommand) Docs() ecdysis.Docs {
}

func (c *RootCommand) SubCommands() []ecdysis.Command {
runCmd := &run.RunCommand{}

return []ecdysis.Command{
&ConfigCommand{rootCmd: c},
&InitCommand{cfg: &c.cfg},
&config.ConfigCommand{RunCmd: runCmd},
&initialize.InitCommand{Cfg: &runCmd.Cfg},
&version.VersionCommand{},
&pipelines.PipelinesCommand{},
&run.RunCommand{},
}
}
27 changes: 0 additions & 27 deletions cmd/conduit/root/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,7 @@ func TestRootCommandFlags(t *testing.T) {
usage string
persistent bool
}{
{longName: "config.path", usage: "global conduit configuration file"},
{longName: "version", shortName: "v", usage: "show the current Conduit version"},
{longName: "db.type", usage: "database type; accepts badger,postgres,inmemory,sqlite"},
{longName: "db.badger.path", usage: "path to badger DB"},
{longName: "db.postgres.connection-string", usage: "postgres connection string, may be a database URL or in PostgreSQL keyword/value format"},
{longName: "db.postgres.table", usage: "postgres table in which to store data (will be created if it does not exist)"},
{longName: "db.sqlite.path", usage: "path to sqlite3 DB"},
{longName: "db.sqlite.table", usage: "sqlite3 table in which to store data (will be created if it does not exist)"},
{longName: "api.enabled", usage: "enable HTTP and gRPC API"},
{longName: "http.address", usage: "address for serving the HTTP API"},
{longName: "grpc.address", usage: "address for serving the gRPC API"},
{longName: "log.level", usage: "sets logging level; accepts debug, info, warn, error, trace"},
{longName: "log.format", usage: "sets the format of the logging; accepts json, cli"},
{longName: "connectors.path", usage: "path to standalone connectors' directory"},
{longName: "processors.path", usage: "path to standalone processors' directory"},
{longName: "pipelines.path", usage: "path to pipelines' directory"},
{longName: "pipelines.exit-on-degraded", usage: "exit Conduit if a pipeline is degraded"},
{longName: "pipelines.error-recovery.min-delay", usage: "minimum delay before restart"},
{longName: "pipelines.error-recovery.max-delay", usage: "maximum delay before restart"},
{longName: "pipelines.error-recovery.backoff-factor", usage: "backoff factor applied to the last delay"},
{longName: "pipelines.error-recovery.max-retries", usage: "maximum number of retries"},
{longName: "pipelines.error-recovery.max-retries-window", usage: "amount of time running without any errors after which a pipeline is considered healthy"},
{longName: "schema-registry.type", usage: "schema registry type; accepts builtin,confluent"},
{longName: "schema-registry.confluent.connection-string", usage: "confluent schema registry connection string"},
{longName: "preview.pipeline-arch-v2", usage: "enables experimental pipeline architecture v2 (note that the new architecture currently supports only 1 source and 1 destination per pipeline)"},
{longName: "dev.cpuprofile", usage: "write CPU profile to file"},
{longName: "dev.memprofile", usage: "write memory profile to file"},
{longName: "dev.blockprofile", usage: "write block profile to file"},
}

e := ecdysis.New()
Expand Down
103 changes: 103 additions & 0 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright © 2024 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package run

import (
"context"
"os"
"path/filepath"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/ecdysis"
)

var (
_ ecdysis.CommandWithFlags = (*RunCommand)(nil)
_ ecdysis.CommandWithExecute = (*RunCommand)(nil)
_ ecdysis.CommandWithDocs = (*RunCommand)(nil)
_ ecdysis.CommandWithConfig = (*RunCommand)(nil)
)

type RunFlags struct {
conduit.Config
}

type RunCommand struct {
flags RunFlags
Cfg conduit.Config
}

func (c *RunCommand) Execute(_ context.Context) error {
e := &conduit.Entrypoint{}
e.Serve(c.Cfg)
return nil
}

func (c *RunCommand) Config() ecdysis.Config {
path := filepath.Dir(c.flags.ConduitCfgPath)

return ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &c.Cfg,
Path: c.flags.ConduitCfgPath,
DefaultValues: conduit.DefaultConfigWithBasePath(path),
}
}

func (c *RunCommand) Usage() string { return "run" }

func (c *RunCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)

currentPath, err := os.Getwd()
if err != nil {
panic(cerrors.Errorf("failed to get current working directory: %w", err))
}

c.Cfg = conduit.DefaultConfigWithBasePath(currentPath)
flags.SetDefault("config.path", c.Cfg.ConduitCfgPath)
flags.SetDefault("db.type", c.Cfg.DB.Type)
flags.SetDefault("db.badger.path", c.Cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.Cfg.DB.Postgres.ConnectionString)
flags.SetDefault("db.postgres.table", c.Cfg.DB.Postgres.Table)
flags.SetDefault("db.sqlite.path", c.Cfg.DB.SQLite.Path)
flags.SetDefault("db.sqlite.table", c.Cfg.DB.SQLite.Table)
flags.SetDefault("api.enabled", c.Cfg.API.Enabled)
flags.SetDefault("http.address", c.Cfg.API.HTTP.Address)
flags.SetDefault("grpc.address", c.Cfg.API.GRPC.Address)
flags.SetDefault("log.level", c.Cfg.Log.Level)
flags.SetDefault("log.format", c.Cfg.Log.Format)
flags.SetDefault("connectors.path", c.Cfg.Connectors.Path)
flags.SetDefault("processors.path", c.Cfg.Processors.Path)
flags.SetDefault("pipelines.path", c.Cfg.Pipelines.Path)
flags.SetDefault("pipelines.exit-on-degraded", c.Cfg.Pipelines.ExitOnDegraded)
flags.SetDefault("pipelines.error-recovery.min-delay", c.Cfg.Pipelines.ErrorRecovery.MinDelay)
flags.SetDefault("pipelines.error-recovery.max-delay", c.Cfg.Pipelines.ErrorRecovery.MaxDelay)
flags.SetDefault("pipelines.error-recovery.backoff-factor", c.Cfg.Pipelines.ErrorRecovery.BackoffFactor)
flags.SetDefault("pipelines.error-recovery.max-retries", c.Cfg.Pipelines.ErrorRecovery.MaxRetries)
flags.SetDefault("pipelines.error-recovery.max-retries-window", c.Cfg.Pipelines.ErrorRecovery.MaxRetriesWindow)
flags.SetDefault("schema-registry.type", c.Cfg.SchemaRegistry.Type)
flags.SetDefault("schema-registry.confluent.connection-string", c.Cfg.SchemaRegistry.Confluent.ConnectionString)
flags.SetDefault("preview.pipeline-arch-v2", c.Cfg.Preview.PipelineArchV2)
return flags
}

func (c *RunCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "Run Conduit",
Long: `Starts the Conduit server and runs the configured pipelines.`,
}
}
Loading
Loading