diff --git a/docs/connectors/building-connectors/developing-destination-connectors.mdx b/docs/connectors/building-connectors/developing-destination-connectors.mdx index 9c96e3ef..eaedd875 100644 --- a/docs/connectors/building-connectors/developing-destination-connectors.mdx +++ b/docs/connectors/building-connectors/developing-destination-connectors.mdx @@ -3,27 +3,41 @@ title: "Developing a Destination Connector" sidebar_position: 4 --- -A Destination is responsible for writing [Record](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#Record) to third party systems. +A Destination is responsible for writing +an [OpenCDC Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record) +to third party systems. -You need to implement the functions required by Destination and provide your own implementations. Information about individual functions are listed below. The **`destination.go`** file is the main file where the functionality of your Destination Connector is implemented. +You need to implement the functions required by Destination and provide your own +implementations. Information about individual functions are listed below. The * +*`destination.go`** file is the main file where the functionality of your +Destination Connector is implemented. ## `Destination struct` -Every Destination implementation needs to include an [UnimplementedDestination](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedDestination) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Destination implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector. +Every Destination implementation needs to include +an [UnimplementedDestination](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedDestination) +to satisfy the interface. This allows us to potentially change the interface in +the future while remaining backward compatible with existing Destination +implementations. This struct can be modified to add additional fields that can +be accessed throughout the lifecycle of the Connector. - ```go - type Destination struct { - sdk.UnimplementedDestination +```go +type Destination struct { + sdk.UnimplementedDestination - config DestinationConfig - } - ``` + config DestinationConfig +} +``` ## Destination Connector Lifecycle Functions ### `NewDestination()` -A constructor function for your Destination struct. Note that this is the same function that should be set as the value of `Connector.NewDestination`. The constructor should be used to wrap your Destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware. +A constructor function for your Destination struct. Note that this is the same +function that should be set as the value of `Connector.NewDestination`. The +constructor should be used to wrap your Destination in the default middleware. +You can add additional middleware, but unless you have a very good reason, you +should always include the default middleware. ```go func NewDestination() sdk.Destination { @@ -37,102 +51,87 @@ func NewDestination() sdk.Destination { **Additional options via `DestinationMiddlewareOption`**: -Currently, the available destination middleware options can be found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/destination_middleware.go#L44-L50). +Currently, the available destination middleware options can be +found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/destination_middleware.go#L44-L50). ### `Parameters()` -A map of named Parameters that describe how to configure the connector. This map is typically generated using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). +A map of named Parameters that describe how to configure the connector. This map +is typically generated using [ +`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). - ```go - func (d *Destination) Parameters() map[string]sdk.Parameter { - return d.config.Parameters() - } - ``` +```go +func (d *Destination) Parameters() config.Parameters { + return d.config.Parameters() +} +``` ### `Configure()` -Validates and stores configuration data for the connector. Any complex validation logic should be implemented here. +Validates and stores configuration data for the connector. Any complex +validation logic should be implemented here. - ```go - func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { - err := sdk.Util.ParseConfig(cfg, &d.config) - if err != nil { - return fmt.Errorf("invalid config: %w", err) - } - // custom validations here - return nil +```go +func (d *Destination) Configure(ctx context.Context, cfg config.Config) error { + err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters()) + if err != nil { + return err } - ``` + // add custom validations here + return nil +} +``` ### `Open()` -Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function. - - ```go - func (d *Destination) Open(ctx context.Context) error { - // Retrieve the directory path from the config - directoryPath := d.config.Directory - - // Check if the directory exists - if _, err := os.Stat(directoryPath); os.IsNotExist(err) { - // Create the directory if it doesn't exist - err := os.MkdirAll(directoryPath, 0755) - if err != nil { - return fmt.Errorf("failed to create directory '%s': %w", directoryPath, err) - } - } else if err != nil { - // Return any error other than the directory not existing - return fmt.Errorf("error checking directory '%s': %w", directoryPath, err) - } +Prepares the connector to start writing records. If needed, the connector should +open connections in this function. - // The directory exists (or was just created), so we can proceed - return nil +```go +func (d *Destination) Open(context.Context) error { + // opens or creates a file at the given path + file, err := d.openOrCreate(d.config.Path) + if err != nil { + return err } - ``` - -### `Write()` -Writes len(records) from a slice of `sdk.Record` objects received from the Conduit pipeline to the destination right away without caching. It should return the number of records written from the slice and any error encountered that caused the write to stop early. + d.file = file + return nil +} +``` - ```go - func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error) { - outputDir := d.config.Directory +### `Write()` - for i, r := range recs { - fileName, ok := r.Key.(sdk.RawData) - if !ok || len(fileName) == 0 { - return i, fmt.Errorf("record key is invalid or not provided, record index: %v", i) - } +Writes `len(records)` from a slice of `opencdc.Record`s received from the +Conduit pipeline to the destination right away without caching. It should return +the number of records written from the slice and any error encountered that +caused the write to stop early. - filePath := filepath.Join(outputDir, string(fileName)) - if err := d.writeToFile(filePath, r.Payload.After.Bytes()); err != nil { - return i, fmt.Errorf("failed to write record to file '%s', record index: %v, error: %w", filePath, i, err) - } - sdk.Logger(ctx).Info().Msgf("Wrote file %s to directory %s\n", string(fileName), outputDir) +```go +func (d *Destination) Write(_ context.Context, recs []opencdc.Record) (int, error) { + for i, r := range recs { + _, err := d.file.Write(append(r.Bytes(), '\n')) + if err != nil { + return i, err } - return len(recs), nil - } - ``` - -### `Ack()` - -Ack signals to the implementation that the record with the supplied position was successfully processed. - - ```go - func (d *Destination) Ack(ctx context.Context, position sdk.Position) error { - sdk.Logger(ctx).Debug().Msg("Record successfully processed") - return nil } - ``` + return len(recs), nil +} +``` ### `Teardown()` -Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here. +Teardown signals to the connector that there will be no more calls to any other +function. Any connections that were created in the `Open()` function should be +closed here. - ```go - func (d *Destination) Teardown(ctx context.Context) error { - return nil - } - ``` +```go +func (d *Destination) Teardown(context.Context) error { + if d.file != nil { + return d.file.Close() + } + return nil +} +``` ![scarf pixel conduit-site-docs-connectors](https://static.scarf.sh/a.png?x-pxid=2fa824d7-fd94-4cf9-a5c8-ea63c9860213) \ No newline at end of file diff --git a/docs/connectors/building-connectors/developing-source-connectors.mdx b/docs/connectors/building-connectors/developing-source-connectors.mdx index 723d4ffb..d871178a 100644 --- a/docs/connectors/building-connectors/developing-source-connectors.mdx +++ b/docs/connectors/building-connectors/developing-source-connectors.mdx @@ -3,31 +3,42 @@ title: "Developing a Source Connector" sidebar_position: 3 --- -A Source is responsible for continuously reading data from a third party system and returning it in the form of an [SDK Record](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#Record). +A Source is responsible for continuously reading data from a third party system +and returning it in the form of +an [OpenCDC Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record). -You need to implement the functions required by the Source interface and provide your own implementations. Information about individual functions are listed below. The **`source.go`** file is the main file where the functionality of your Source Connector is implemented. +You need to implement the functions required by +the [Source](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#Source) +interface and provide your own implementations. Information about individual +functions are listed below. The **`source.go`** file is the main file where the +functionality of your source connector is implemented. ## `Source struct` -Every Source implementation needs to include an [UnimplementedSource](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedSource) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Source implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector +Every Source implementation needs to include +an [UnimplementedSource](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedSource) +to satisfy the interface. This allows us to potentially change the interface in +the future while remaining backward compatible with existing Source +implementations. This struct can be modified to add additional fields that can +be accessed throughout the lifecycle of the Connector. - ```go - type Source struct { - sdk.UnimplementedSource +```go +type Source struct { + sdk.UnimplementedSource - config SourceConfig - lastPositionRead sdk.Position //nolint:unused // this is just an example - watcher *fsnotify.Watcher - recentlyCreated sync.Map // To keep track of recently created files - createCooldown time.Duration // Cooldown period after a create event - } - ``` + config SourceConfig + tail *tail.Tail +} +``` ## Source Connector Lifecycle Functions ### `NewSource()` -A constructor function for your Source struct. Note that this is the same function that should be set as the value of `Connector.NewSource`. The constructor should be used to wrap your Source in the default `DefaultSourceMiddleware`. +A constructor function for your Source struct. Note that this is the same +function that should be set as the value of `Connector.NewSource`. The +constructor should be used to wrap your Source in the default +`DefaultSourceMiddleware`. ```go func NewSource() sdk.Source { @@ -41,161 +52,170 @@ func NewSource() sdk.Source { **Additional options via `SourceMiddlewareOption`**: -In case you need to add additional middleware options, you can do so by passing it to the `sdk.SourceWithMiddleware` function via `sdk.DefaultSourceMiddleware(opts ...SourceMiddlewareOption)`. Currently, the available source middleware options can be found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/source_middleware.go#L42-L46). +In case you need to add additional middleware options, you can do so by passing +it to the `sdk.SourceWithMiddleware` function via +`sdk.DefaultSourceMiddleware(opts ...SourceMiddlewareOption)`. Currently, the +available source middleware options can be +found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/source_middleware.go#L42-L46). :::note -If you're using a source connector that's not generating structured data (i.e. produces raw data), you might want to disable schema extraction by default by overwriting the `sdk.SourceWithSchemaExtractionConfig` options: + +If you're using a source connector that's not generating structured data (i.e. +produces raw data), you might want to disable schema extraction by default by +overwriting the `sdk.SourceWithSchemaExtractionConfig` options: ```go sdk.SourceWithMiddleware( - &Source{}, - sdk.DefaultSourceMiddleware( - // disable schema extraction by default, because the source produces raw data - sdk.SourceWithSchemaExtractionConfig{ - PayloadEnabled: lang.Ptr(false), - KeyEnabled: lang.Ptr(false), - }, - )..., - ) + &Source{}, + sdk.DefaultSourceMiddleware( + // disable schema extraction by default, because the source produces raw data + sdk.SourceWithSchemaExtractionConfig{ + PayloadEnabled: lang.Ptr(false), + KeyEnabled: lang.Ptr(false), + }, + )..., + ) ``` ::: ### `Parameters()` -A map of named Parameters that describe how to configure the connector. This map is typically generates using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). +A map of named Parameters that describe how to configure the connector. This map +is typically generates using [ +`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). ```go -func (s *Source) Parameters() map[string]sdk.Parameter { +func (s *Source) Parameters() config.Parameters { return s.config.Parameters() } ``` ### `Configure()` -Validates and stores configuration data for the connector. Any complex validation logic should be implemented here. +Validates and stores configuration data for the connector. Any complex +validation logic should be implemented here. - ```go - func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { - sdk.Logger(ctx).Info().Msg("Configuring Source...") - err := sdk.Util.ParseConfig(cfg, &s.config) - if err != nil { - return fmt.Errorf("invalid config: %w", err) - } - // add custom validations here - return nil - } - ``` +```go +func (s *Source) Configure(ctx context.Context, cfg config.Config) error { + err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters()) + if err != nil { + return err + } + // add custom validations here + return nil +} +``` ### `Open()` -Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function. +Prepares the connector to start producing records based on the last known +successful position. If needed, the connector should open connections in this +function. - ```go - func (s *Source) Open(ctx context.Context, pos sdk.Position) error { - configDirPath := s.config.Directory - files, err := ioutil.ReadDir(configDirPath) - if err != nil { - return fmt.Errorf("error reading directory '%s': %w", configDirPath, err) - } +Every record read by a source connector has +a [position](https://conduit.io/docs/features/opencdc-record#fields) attached. +The position given to `Open()` is the position of the record that was the last +to be successfully processed end-to-end, before the connector stopped, or `nil` +if no records were read. Hence, a position needs to contain enough information +for a source connector to resume reading records from where it exactly stopped. - for _, f := range files { - sdk.Logger(ctx).Info().Msgf(" - %s\n", f.Name()) - } +A position is a slice of bytes that can represent any data structure. In Conduit +connectors, it's common to see that a position is actually a `struct`, that's +marshalled into a JSON string. In the example below, the position is an offset +within the file being read. - w, err := fsnotify.NewWatcher() +```go +func (s *Source) Open(ctx context.Context, p opencdc.Position) error { + // parse the position + var offset int64 + if p != nil { + var err error + offset, err = strconv.ParseInt(string(p), 10, 64) if err != nil { - return fmt.Errorf("error creating fsnotify watcher: %w", err) + return fmt.Errorf("invalid position %v, expected a number", p) } - s.watcher = w + } - s.createCooldown = 2 * time.Second - return s.watcher.Add(s.config.Directory) + // seek to the position, i.e. the offset + sdk.Logger(ctx).Info(). + Int64("position", offset). + Msgf("seeking...") + + t, err := tail.TailFile( + s.config.Path, + tail.Config{ + Follow: true, + Location: &tail.SeekInfo{ + Offset: offset, + Whence: io.SeekStart, + }, + Logger: tail.DiscardingLogger, + }, + ) + if err != nil { + return fmt.Errorf("could not tail file: %w", err) } - ``` + + s.tail = t + return nil +} +``` ### `Read()` -Gathers data from the configured data source and formats it into a `sdk.Record` that is returned from the function. The returned `sdk.Record` is queued into the pipeline to be consumed by a Destination connector. - - ```go - func (s *Source) Read(ctx context.Context) (sdk.Record, error) { - for { - select { - case event, ok := <-s.watcher.Events: - if !ok { - return sdk.Record{}, fmt.Errorf("events channel was closed") - } - - if event.Op&fsnotify.Create == fsnotify.Create { - sdk.Logger(ctx).Info().Msgf("Detected new file: %s", event.Name) - - // Mark the file as recently created to avoid processing if it is modified shortly after being created - s.markAsRecentlyCreated(event.Name) - - // Read the newly created file - fileContent, err := ioutil.ReadFile(event.Name) - if err != nil { - return sdk.Record{}, err - } - - recordKey := sdk.RawData(filepath.Base(event.Name)) - recordValue := sdk.RawData(fileContent) - - // Return a Record reflecting that a new file has been created - return sdk.Util.Source.NewRecordCreate( - sdk.Position(recordKey), - map[string]string{ - MetadataFilePath: event.Name, - }, - recordKey, - recordValue, - ), nil - } - - // If the event is not a Create event, continue listening without doing anything - continue - - case err, ok := <-s.watcher.Errors: - if !ok { - return sdk.Record{}, fmt.Errorf("errors channel was closed") - } - return sdk.Record{}, fmt.Errorf("error on watcher: %w", err) - - case <-ctx.Done(): - return sdk.Record{}, ctx.Err() - } +Gathers data from the configured data source and formats it into a +`opencdc.Record` that is returned from the function. The returned +`opencdc.Record` is queued into the pipeline to be consumed by a Destination +connector. + +```go +func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { + select { + case line, ok := <-s.tail.Lines: + if !ok { + return opencdc.Record{}, s.tail.Err() } + return sdk.Util.Source.NewRecordCreate( + opencdc.Position(strconv.FormatInt(line.SeekInfo.Offset, 10)), + map[string]string{ + MetadataFilePath: s.config.Path, + }, + opencdc.RawData(strconv.Itoa(line.Num)), // use line number as key + opencdc.RawData(line.Text), // use line content as payload + ), nil + case <-ctx.Done(): + return opencdc.Record{}, ctx.Err() } - ``` +} +``` ### `Ack()` -Ack signals to the implementation that the record with the supplied position was successfully processed. +`Ack` signals to the third party system that the record with the supplied +position was successfully processed. It's worth noting that while some source +connectors need to implement this functionality (e.g. in the case of messaging +brokers), others don't have to (e.g. a file source). - ```go - func (s *Source) Ack(ctx context.Context, position sdk.Position) error { - sdk.Logger(ctx).Debug().Msg("Record successfully processed") - return nil - } - ``` +```go +func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { + sdk.Logger(ctx).Trace().Msg("record successfully processed") + return nil // no ack needed +} +``` ### `Teardown()` -Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here. - - ```go - func (s *Source) Teardown(ctx context.Context) error { +Teardown signals to the connector that there will be no more calls to any other +function. Any connections that were created in the `Open()` function should be +closed here. - if s.watcher != nil { - err := s.watcher.Close() - if err != nil { - // Log the error or handle it as needed - sdk.Logger(ctx).Error().Msgf("Failed to close fsnotify watcher: %v", err) - return fmt.Errorf("failed to close fsnotify watcher: %w", err) - } - } - return nil +```go +func (s *Source) Teardown(context.Context) error { + if s.tail != nil { + return s.tail.Stop() } - ``` + return nil +} +``` ![scarf pixel conduit-site-docs-connectors](https://static.scarf.sh/a.png?x-pxid=2fa824d7-fd94-4cf9-a5c8-ea63c9860213)