Skip to content

Commit

Permalink
Update guides for developing connectors (#138)
Browse files Browse the repository at this point in the history
* Update guides for developing connectors

* Update guides for developing connectors

* tabs

* Update docs/connectors/building-connectors/developing-source-connectors.mdx

Co-authored-by: Lovro Mažgon <[email protected]>

* merge

* format

---------

Co-authored-by: Lovro Mažgon <[email protected]>
  • Loading branch information
hariso and lovromazgon authored Sep 19, 2024
1 parent 474a42a commit a4584ff
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,41 @@ title: "Developing a Destination Connector"
sidebar_position: 4
---

A Destination is responsible for writing [Record](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#Record) to third party systems.
A Destination is responsible for writing
an [OpenCDC Record](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)
to third party systems.

You need to implement the functions required by Destination and provide your own implementations. Information about individual functions are listed below. The **`destination.go`** file is the main file where the functionality of your Destination Connector is implemented.
You need to implement the functions required by Destination and provide your own
implementations. Information about individual functions are listed below. The *
*`destination.go`** file is the main file where the functionality of your
Destination Connector is implemented.

## `Destination struct`

Every Destination implementation needs to include an [UnimplementedDestination](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedDestination) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Destination implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector.
Every Destination implementation needs to include
an [UnimplementedDestination](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedDestination)
to satisfy the interface. This allows us to potentially change the interface in
the future while remaining backward compatible with existing Destination
implementations. This struct can be modified to add additional fields that can
be accessed throughout the lifecycle of the Connector.

```go
type Destination struct {
sdk.UnimplementedDestination
```go
type Destination struct {
sdk.UnimplementedDestination

config DestinationConfig
}
```
config DestinationConfig
}
```

## Destination Connector Lifecycle Functions

### `NewDestination()`

A constructor function for your Destination struct. Note that this is the same function that should be set as the value of `Connector.NewDestination`. The constructor should be used to wrap your Destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware.
A constructor function for your Destination struct. Note that this is the same
function that should be set as the value of `Connector.NewDestination`. The
constructor should be used to wrap your Destination in the default middleware.
You can add additional middleware, but unless you have a very good reason, you
should always include the default middleware.

```go
func NewDestination() sdk.Destination {
Expand All @@ -37,102 +51,87 @@ func NewDestination() sdk.Destination {

**Additional options via `DestinationMiddlewareOption`**:

Currently, the available destination middleware options can be found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/destination_middleware.go#L44-L50).
Currently, the available destination middleware options can be
found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/destination_middleware.go#L44-L50).

### `Parameters()`

A map of named Parameters that describe how to configure the connector. This map is typically generated using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen).
A map of named Parameters that describe how to configure the connector. This map
is typically generated using [
`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen).

```go
func (d *Destination) Parameters() map[string]sdk.Parameter {
return d.config.Parameters()
}
```
```go
func (d *Destination) Parameters() config.Parameters {
return d.config.Parameters()
}
```

### `Configure()`

Validates and stores configuration data for the connector. Any complex validation logic should be implemented here.
Validates and stores configuration data for the connector. Any complex
validation logic should be implemented here.

```go
func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
err := sdk.Util.ParseConfig(cfg, &d.config)
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}
// custom validations here
return nil
```go
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return err
}
```
// add custom validations here
return nil
}
```

### `Open()`

Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function.

```go
func (d *Destination) Open(ctx context.Context) error {
// Retrieve the directory path from the config
directoryPath := d.config.Directory

// Check if the directory exists
if _, err := os.Stat(directoryPath); os.IsNotExist(err) {
// Create the directory if it doesn't exist
err := os.MkdirAll(directoryPath, 0755)
if err != nil {
return fmt.Errorf("failed to create directory '%s': %w", directoryPath, err)
}
} else if err != nil {
// Return any error other than the directory not existing
return fmt.Errorf("error checking directory '%s': %w", directoryPath, err)
}
Prepares the connector to start writing records. If needed, the connector should
open connections in this function.

// The directory exists (or was just created), so we can proceed
return nil
```go
func (d *Destination) Open(context.Context) error {
// opens or creates a file at the given path
file, err := d.openOrCreate(d.config.Path)
if err != nil {
return err
}
```

### `Write()`

Writes len(records) from a slice of `sdk.Record` objects received from the Conduit pipeline to the destination right away without caching. It should return the number of records written from the slice and any error encountered that caused the write to stop early.
d.file = file
return nil
}
```

```go
func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error) {
outputDir := d.config.Directory
### `Write()`

for i, r := range recs {
fileName, ok := r.Key.(sdk.RawData)
if !ok || len(fileName) == 0 {
return i, fmt.Errorf("record key is invalid or not provided, record index: %v", i)
}
Writes `len(records)` from a slice of `opencdc.Record`s received from the
Conduit pipeline to the destination right away without caching. It should return
the number of records written from the slice and any error encountered that
caused the write to stop early.

filePath := filepath.Join(outputDir, string(fileName))
if err := d.writeToFile(filePath, r.Payload.After.Bytes()); err != nil {
return i, fmt.Errorf("failed to write record to file '%s', record index: %v, error: %w", filePath, i, err)
}
sdk.Logger(ctx).Info().Msgf("Wrote file %s to directory %s\n", string(fileName), outputDir)
```go
func (d *Destination) Write(_ context.Context, recs []opencdc.Record) (int, error) {
for i, r := range recs {
_, err := d.file.Write(append(r.Bytes(), '\n'))
if err != nil {
return i, err
}
return len(recs), nil
}
```

### `Ack()`

Ack signals to the implementation that the record with the supplied position was successfully processed.

```go
func (d *Destination) Ack(ctx context.Context, position sdk.Position) error {
sdk.Logger(ctx).Debug().Msg("Record successfully processed")
return nil
}
```
return len(recs), nil
}
```

### `Teardown()`

Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here.
Teardown signals to the connector that there will be no more calls to any other
function. Any connections that were created in the `Open()` function should be
closed here.

```go
func (d *Destination) Teardown(ctx context.Context) error {
return nil
}
```
```go
func (d *Destination) Teardown(context.Context) error {
if d.file != nil {
return d.file.Close()
}
return nil
}
```

![scarf pixel conduit-site-docs-connectors](https://static.scarf.sh/a.png?x-pxid=2fa824d7-fd94-4cf9-a5c8-ea63c9860213)
Loading

0 comments on commit a4584ff

Please sign in to comment.