Skip to content

Commit

Permalink
Exit Conduit if pipeline config file fails to be provisioned (#964)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Apr 11, 2023
1 parent 588098b commit a3c4edf
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 12 deletions.
14 changes: 12 additions & 2 deletions cmd/conduit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,16 @@ func parseConfig() conduit.Config {

connectorsDir = flags.String("connectors.path", "./connectors", "path to standalone connectors directory")

pipelinesDir = flags.String("pipelines.path", "./pipelines", "path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file")
pipelinesDir = flags.String(
"pipelines.path",
"./pipelines",
"path to the directory that has the yaml pipeline configuration files, or a single pipeline configuration file",
)
pipelinesExitOnError = flags.Bool(
"pipelines.exit-on-error",
false,
"exit Conduit if a pipeline experiences an error while running",
)
)

// flags is set up to exit on error, we can safely ignore the error
Expand All @@ -94,7 +103,7 @@ func parseConfig() conduit.Config {
cfg.Log.Format = strings.ToLower(stringPtrToVal(logFormat))
cfg.Connectors.Path = stringPtrToVal(connectorsDir)
cfg.Pipelines.Path = strings.ToLower(stringPtrToVal(pipelinesDir))

cfg.Pipelines.ExitOnError = *pipelinesExitOnError
return cfg
}

Expand All @@ -116,6 +125,7 @@ func cancelOnInterrupt(ctx context.Context) context.Context {
<-signalChan // second interrupt signal
os.Exit(exitCodeInterrupt)
}()

return ctx
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/conduit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type Config struct {
}

Pipelines struct {
Path string
Path string
ExitOnError bool
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,16 @@ func (r *Runtime) Run(ctx context.Context) (err error) {
if err != nil {
return cerrors.Errorf("failed to init connector service: %w", err)
}

if r.Config.Pipelines.ExitOnError {
r.pipelineService.OnFailure(func(e pipeline.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Msg("Conduit will shut down due to a pipeline failure and 'exit on error' enabled")
t.Kill(cerrors.Errorf("shut down due to 'exit on error' enabled: %w", e.Error))
})
}
err = r.pipelineService.Init(ctx)
if err != nil {
return cerrors.Errorf("failed to init pipeline service: %w", err)
Expand All @@ -227,6 +237,13 @@ func (r *Runtime) Run(ctx context.Context) (err error) {
multierror.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("provisioning failed")
})
if r.Config.Pipelines.ExitOnError {
r.logger.Warn(ctx).
Err(err).
Msg("Conduit will shut down due to a pipeline provisioning failure and 'exit on error' enabled")
err = cerrors.Errorf("shut down due to 'exit on error' enabled: %w", err)
return err
}
}

err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.pluginService)
Expand Down
18 changes: 10 additions & 8 deletions pkg/conduit/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ import (
"time"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cchan"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/matryer/is"
)

// path where tests store their data during runs.
const delay = 500 * time.Millisecond

func TestRuntime(t *testing.T) {
is := is.New(t)

Expand All @@ -44,14 +42,18 @@ func TestRuntime(t *testing.T) {
is.True(r != nil)

// set a cancel on a trigger to kill the context after THRESHOLD duration.
ctx, cancel := context.WithCancel(context.TODO())
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(delay)
time.Sleep(500 * time.Millisecond)
cancel()
}()

// wait on Run and assert that the context was canceled and no other error
// occurred.
err = r.Run(ctx)
errC := make(chan error)
go func() {
errC <- r.Run(ctx)
}()
err, got, recvErr := cchan.ChanOut[error](errC).RecvTimeout(context.Background(), time.Second)
is.NoErr(recvErr)
is.True(got)
is.True(cerrors.Is(err, context.Canceled)) // expected error to be context.Cancelled
}
4 changes: 3 additions & 1 deletion pkg/pipeline/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func (s *Service) Run(
return err
}

// Start builds and starts a pipeline instance.
// Start builds and starts a pipeline with the given ID.
// If the pipeline is already running, Start returns ErrPipelineRunning.
func (s *Service) Start(
ctx context.Context,
connFetcher ConnectorFetcher,
Expand Down Expand Up @@ -623,6 +624,7 @@ func (s *Service) runPipeline(ctx context.Context, pl *Instance) error {
Str(log.PipelineIDField, pl.ID).
Msg("pipeline stopped")

s.notify(pl.ID, err)
// It's important to update the metrics before we handle the error from s.Store.Set() (if any),
// since the source of the truth is the actual pipeline (stored in memory).
measure.PipelinesGauge.WithValues(strings.ToLower(pl.Status.String())).Inc()
Expand Down
11 changes: 11 additions & 0 deletions pkg/pipeline/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cchan"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database/inmemory"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand Down Expand Up @@ -203,6 +204,10 @@ func TestServiceLifecycle_PipelineError(t *testing.T) {
pl, err = ps.AddConnector(ctx, pl.ID, destination.ID)
is.NoErr(err)

events := make(chan FailureEvent, 1)
ps.OnFailure(func(e FailureEvent) {
events <- e
})
// start the pipeline now that everything is set up
err = ps.Start(
ctx,
Expand Down Expand Up @@ -233,6 +238,12 @@ func TestServiceLifecycle_PipelineError(t *testing.T) {
is.True(
strings.Contains(pl.Error, wantErr.Error()),
) // expected error message to contain "source connector error"

event, eventReceived, err := cchan.Chan[FailureEvent](events).RecvTimeout(ctx, 200*time.Millisecond)
is.NoErr(err)
is.True(eventReceived)
is.Equal(pl.ID, event.ID)
is.True(cerrors.Is(event.Error, wantErr))
}

func TestServiceLifecycle_PipelineStop(t *testing.T) {
Expand Down
30 changes: 30 additions & 0 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import (
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
)

type FailureEvent struct {
// ID is the ID of the pipeline which failed.
ID string
Error error
}

type FailureHandler func(FailureEvent)

// Service manages pipelines.
type Service struct {
logger log.CtxLogger
Expand All @@ -33,6 +41,7 @@ type Service struct {

instances map[string]*Instance
instanceNames map[string]bool
handlers []FailureHandler
}

// NewService initializes and returns a pipeline Service.
Expand Down Expand Up @@ -296,3 +305,24 @@ func (s *Service) Delete(ctx context.Context, pipelineID string) error {

return nil
}

// OnFailure registers a handler for a pipeline.FailureEvent.
// Only errors which happen after a pipeline has been started
// are being sent.
func (s *Service) OnFailure(handler FailureHandler) {
s.handlers = append(s.handlers, handler)
}

// notify notifies all registered FailureHandlers about an error.
func (s *Service) notify(pipelineID string, err error) {
if err == nil {
return
}
e := FailureEvent{
ID: pipelineID,
Error: err,
}
for _, handler := range s.handlers {
handler(e)
}
}

0 comments on commit a3c4edf

Please sign in to comment.