diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 3d7013913..75ef704d0 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -75,6 +75,11 @@ type Config struct { PluginDispenserFactories map[string]builtin.DispenserFactory ProcessorBuilderRegistry *processor.BuilderRegistry + + dev struct { + cpuprofile string + memprofile string + } } func DefaultConfig() Config { diff --git a/pkg/conduit/entrypoint.go b/pkg/conduit/entrypoint.go index 488ce5ec4..15691b25a 100644 --- a/pkg/conduit/entrypoint.go +++ b/pkg/conduit/entrypoint.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "os/signal" + "strings" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/peterbourgon/ff/v3" @@ -74,7 +75,7 @@ func (e *Entrypoint) Serve(cfg Config) { // config struct. func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { // TODO extract flags from config struct rather than defining flags manually - flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError) + flags := flag.NewFlagSet("conduit", flag.ExitOnError) flags.StringVar(&cfg.DB.Type, "db.type", cfg.DB.Type, "database type; accepts badger,postgres,inmemory") flags.StringVar(&cfg.DB.Badger.Path, "db.badger.path", cfg.DB.Badger.Path, "path to badger DB") @@ -93,6 +94,25 @@ func (*Entrypoint) Flags(cfg *Config) *flag.FlagSet { flags.StringVar(&cfg.Pipelines.Path, "pipelines.path", cfg.Pipelines.Path, "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file") flags.BoolVar(&cfg.Pipelines.ExitOnError, "pipelines.exit-on-error", cfg.Pipelines.ExitOnError, "exit Conduit if a pipeline experiences an error while running") + // NB: flags with prefix dev.* are hidden from help output by default, they only show up using '-dev -help' + showDevHelp := flags.Bool("dev", false, "used together with the dev flag it shows dev flags") + flags.StringVar(&cfg.dev.cpuprofile, "dev.cpuprofile", "", "write cpu profile to file") + flags.StringVar(&cfg.dev.memprofile, "dev.memprofile", "", "write memory profile to file") + + // show user or dev flags + flags.Usage = func() { + tmpFlags := flag.NewFlagSet("conduit", flag.ExitOnError) + flags.VisitAll(func(f *flag.Flag) { + if f.Name == "dev" || strings.HasPrefix(f.Name, "dev.") != *showDevHelp { + return // hide flag from output + } + // reset value to its default, to ensure default is shown correctly + _ = f.Value.Set(f.DefValue) + tmpFlags.Var(f.Value, f.Name, f.Usage) + }) + tmpFlags.Usage() + } + return flags } diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index f2e19b68d..fde35b61b 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -22,6 +22,9 @@ import ( "context" "net" "net/http" + "os" + "runtime" + "runtime/pprof" "strings" "time" @@ -199,6 +202,33 @@ func newServices( // one of the services experiences a fatal error. func (r *Runtime) Run(ctx context.Context) (err error) { t, ctx := tomb.WithContext(ctx) + + if r.Config.dev.cpuprofile != "" { + f, err := os.Create(r.Config.dev.cpuprofile) + if err != nil { + return cerrors.Errorf("could not create CPU profile: %w", err) + } + defer f.Close() + if err := pprof.StartCPUProfile(f); err != nil { + return cerrors.Errorf("could not start CPU profile: %w", err) + } + defer pprof.StopCPUProfile() + } + if r.Config.dev.memprofile != "" { + defer func() { + f, err := os.Create(r.Config.dev.memprofile) + if err != nil { + r.logger.Err(ctx, err).Msg("could not create memory profile") + return + } + defer f.Close() + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + r.logger.Err(ctx, err).Msg("could not write memory profile") + } + }() + } + defer func() { if err != nil { // This means run failed, we kill the tomb to stop any goroutines