diff --git a/cmd/conduit/main.go b/cmd/conduit/main.go index 0e83b8d95..83e6eeb84 100644 --- a/cmd/conduit/main.go +++ b/cmd/conduit/main.go @@ -71,7 +71,16 @@ func parseConfig() conduit.Config { connectorsDir = flags.String("connectors.path", "./connectors", "path to standalone connectors directory") - pipelinesDir = flags.String("pipelines.path", "./pipelines", "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file") + pipelinesDir = flags.String( + "pipelines.path", + "./pipelines", + "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file", + ) + pipelinesExitOnError = flags.Bool( + "pipelines.exit-on-error", + false, + "exit Conduit if a pipeline experiences an error while running", + ) ) // flags is set up to exit on error, we can safely ignore the error @@ -94,7 +103,7 @@ func parseConfig() conduit.Config { cfg.Log.Format = strings.ToLower(stringPtrToVal(logFormat)) cfg.Connectors.Path = stringPtrToVal(connectorsDir) cfg.Pipelines.Path = strings.ToLower(stringPtrToVal(pipelinesDir)) - + cfg.Pipelines.ExitOnError = *pipelinesExitOnError return cfg } @@ -116,6 +125,7 @@ func cancelOnInterrupt(ctx context.Context) context.Context { <-signalChan // second interrupt signal os.Exit(exitCodeInterrupt) }() + return ctx } diff --git a/pkg/conduit/config.go b/pkg/conduit/config.go index 3f87c44f4..1b8e8ea35 100644 --- a/pkg/conduit/config.go +++ b/pkg/conduit/config.go @@ -58,7 +58,8 @@ type Config struct { } Pipelines struct { - Path string + Path string + ExitOnError bool } } diff --git a/pkg/conduit/runtime.go b/pkg/conduit/runtime.go index 3de6cc6c3..f90526a6e 100644 --- a/pkg/conduit/runtime.go +++ b/pkg/conduit/runtime.go @@ -217,6 +217,16 @@ func (r *Runtime) Run(ctx context.Context) (err error) { if err != nil { return cerrors.Errorf("failed to init connector service: %w", err) } + + if r.Config.Pipelines.ExitOnError { + r.pipelineService.OnFailure(func(e pipeline.FailureEvent) { + r.logger.Warn(ctx). + Err(e.Error). + Str(log.PipelineIDField, e.ID). + Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled") + t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error)) + }) + } err = r.pipelineService.Init(ctx) if err != nil { return cerrors.Errorf("failed to init pipeline service: %w", err) @@ -227,6 +237,13 @@ func (r *Runtime) Run(ctx context.Context) (err error) { multierror.ForEach(err, func(err error) { r.logger.Err(ctx, err).Msg("provisioning failed") }) + if r.Config.Pipelines.ExitOnError { + r.logger.Warn(ctx). + Err(err). + Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled") + err = cerrors.Errorf("shut down due to 'exit on error' enabled: %w", err) + return err + } } err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.pluginService) diff --git a/pkg/conduit/runtime_test.go b/pkg/conduit/runtime_test.go index 284d9ba86..57d113665 100644 --- a/pkg/conduit/runtime_test.go +++ b/pkg/conduit/runtime_test.go @@ -20,13 +20,11 @@ import ( "time" "github.com/conduitio/conduit/pkg/conduit" + "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/matryer/is" ) -// path where tests store their data during runs. -const delay = 500 * time.Millisecond - func TestRuntime(t *testing.T) { is := is.New(t) @@ -44,14 +42,18 @@ func TestRuntime(t *testing.T) { is.True(r != nil) // set a cancel on a trigger to kill the context after THRESHOLD duration. - ctx, cancel := context.WithCancel(context.TODO()) + ctx, cancel := context.WithCancel(context.Background()) go func() { - time.Sleep(delay) + time.Sleep(500 * time.Millisecond) cancel() }() - // wait on Run and assert that the context was canceled and no other error - // occurred. - err = r.Run(ctx) + errC := make(chan error) + go func() { + errC <- r.Run(ctx) + }() + err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), time.Second) + is.NoErr(recvErr) + is.True(got) is.True(cerrors.Is(err, context.Canceled)) // expected error to be context.Cancelled } diff --git a/pkg/pipeline/lifecycle.go b/pkg/pipeline/lifecycle.go index a112cb6e5..335ffd7dd 100644 --- a/pkg/pipeline/lifecycle.go +++ b/pkg/pipeline/lifecycle.go @@ -75,7 +75,8 @@ func (s *Service) Run( return err } -// Start builds and starts a pipeline instance. +// Start builds and starts a pipeline with the given ID. +// If the pipeline is already running, Start returns ErrPipelineRunning. func (s *Service) Start( ctx context.Context, connFetcher ConnectorFetcher, @@ -623,6 +624,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error { Str(log.PipelineIDField, pl.ID). Msg("pipeline stopped") + s.notify(pl.ID, err) // It's important to update the metrics before we handle the error from s.Store.Set() (if any), // since the source of the truth is the actual pipeline (stored in memory). measure.PipelinesGauge.WithValues(strings.ToLower(pl.Status.String())).Inc() diff --git a/pkg/pipeline/lifecycle_test.go b/pkg/pipeline/lifecycle_test.go index bf490bfd2..9d2da3d94 100644 --- a/pkg/pipeline/lifecycle_test.go +++ b/pkg/pipeline/lifecycle_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/conduitio/conduit/pkg/connector" + "github.com/conduitio/conduit/pkg/foundation/cchan" "github.com/conduitio/conduit/pkg/foundation/cerrors" "github.com/conduitio/conduit/pkg/foundation/database/inmemory" "github.com/conduitio/conduit/pkg/foundation/log" @@ -203,6 +204,10 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { pl, err = ps.AddConnector(ctx, pl.ID, destination.ID) is.NoErr(err) + events := make(chan FailureEvent, 1) + ps.OnFailure(func(e FailureEvent) { + events <- e + }) // start the pipeline now that everything is set up err = ps.Start( ctx, @@ -233,6 +238,12 @@ func TestServiceLifecycle_PipelineError(t *testing.T) { is.True( strings.Contains(pl.Error, wantErr.Error()), ) // expected error message to contain "source connector error" + + event, eventReceived, err := cchan.Chan[FailureEvent](events).RecvTimeout(ctx, 200*time.Millisecond) + is.NoErr(err) + is.True(eventReceived) + is.Equal(pl.ID, event.ID) + is.True(cerrors.Is(event.Error, wantErr)) } func TestServiceLifecycle_PipelineStop(t *testing.T) { diff --git a/pkg/pipeline/service.go b/pkg/pipeline/service.go index db2148911..3e7f56a18 100644 --- a/pkg/pipeline/service.go +++ b/pkg/pipeline/service.go @@ -25,6 +25,14 @@ import ( "github.com/conduitio/conduit/pkg/foundation/metrics/measure" ) +type FailureEvent struct { + // ID is the ID of the pipeline which failed. + ID string + Error error +} + +type FailureHandler func(FailureEvent) + // Service manages pipelines. type Service struct { logger log.CtxLogger @@ -33,6 +41,7 @@ type Service struct { instances map[string]*Instance instanceNames map[string]bool + handlers []FailureHandler } // NewService initializes and returns a pipeline Service. @@ -296,3 +305,24 @@ func (s *Service) Delete(ctx context.Context, pipelineID string) error { return nil } + +// OnFailure registers a handler for a pipeline.FailureEvent. +// Only errors which happen after a pipeline has been started +// are being sent. +func (s *Service) OnFailure(handler FailureHandler) { + s.handlers = append(s.handlers, handler) +} + +// notify notifies all registered FailureHandlers about an error. +func (s *Service) notify(pipelineID string, err error) { + if err == nil { + return + } + e := FailureEvent{ + ID: pipelineID, + Error: err, + } + for _, handler := range s.handlers { + handler(e) + } +}