From 6b51ee3372ea1b4ecf4a39b9e42184231dd7f0b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Barroso?= Date: Wed, 7 Aug 2024 17:54:48 +0100 Subject: [PATCH] feat: release 0.11.0 (#109) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * doc: update to 0.11.0 * fix: announcement bar dismissal * document metadata fields * add link for source * combine metadata fields * combine differently * refactor building connector This is so the each function could be anchored and shows up in the menu of the right hand-side. I considered this was more relevant than what was linked before: before: ![](https://d.pr/i/QtqZUO.png) after: ![](https://d.pr/i/3WKPvo.png) * Update docs/features/opencdc-record.mdx Co-authored-by: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> * Update docs/features/opencdc-record.mdx Co-authored-by: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> * Update docs/features/opencdc-record.mdx Co-authored-by: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> * Update docs/features/opencdc-record.mdx Co-authored-by: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> * Include additional options to middleware * update how it works docs with schema host functions (#111) * update how it works docs with schema host functions * Update docs/processors/standalone/how-it-works.mdx Co-authored-by: Lovro Mažgon * Update docs/processors/standalone/how-it-works.mdx Co-authored-by: Lovro Mažgon * Update docs/processors/standalone/how-it-works.mdx Co-authored-by: Raúl Barroso --------- Co-authored-by: Lovro Mažgon Co-authored-by: Raúl Barroso * Update processor specs and regenerate docs (#113) * update processor specs and regenerate docs * fix example name * updates architecture * includes schema service on architecture page * updates architecture page * generate processor specs (#115) * adds new schema registry feature page * tweaks * updates * more accurate * address feedback * add schema utility functions to building.mdx (#114) * include github repo * tweaks * fix link * regenerate processor docs with latest specs (#117) * General info about schema support (#116) * Add general info about schema support * connectors and processors * schema reg: include github repo * fix link * update architecture diagram * Update docs/features/opencdc-record.mdx Co-authored-by: Lovro Mažgon * back to lowercase * fix json encoding md * Update docs/features/schema-support.mdx Co-authored-by: Lovro Mažgon * Update docs/features/schema-support.mdx Co-authored-by: Lovro Mažgon * Update docs/features/storage.mdx Co-authored-by: Lovro Mažgon * Update docs/features/schema-support.mdx Co-authored-by: Lovro Mažgon * Update docs/processors/standalone/building.mdx Co-authored-by: Lovro Mažgon * fix indentation and complete snippet * Fix wording and link * more accurate * fix go code * Update docs/connectors/building-connectors/developing-destination-connectors.mdx Co-authored-by: Lovro Mažgon * Update docs/connectors/building-connectors/developing-source-connectors.mdx Co-authored-by: Lovro Mažgon * Update docs/processors/standalone/building.mdx Co-authored-by: Lovro Mažgon * doc: configuration parameters (#119) * fix link * configuration parameters * add batch size * add schema extraction * explain batching * uses output format --------- Co-authored-by: Lovro Mažgon * Add info about supported data types (#118) * Add info about supported data types * add link * move page * Update docs/features/opencdc-record.mdx * Update docs/features/opencdc-record.mdx * Update docs/features/opencdc-record.mdx --------- Co-authored-by: Lovro Mažgon * updates * update * fix formatting * update architecture page * Add default values * fix configuration links * add example for schema extraction * explain example * capitalize * Update docs/connectors/configuration-parameters/output-format.mdx Co-authored-by: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> --------- Co-authored-by: Maha Hajja <82542081+maha-hajja@users.noreply.github.com> Co-authored-by: Lovro Mažgon Co-authored-by: Haris Osmanagić --- docs/connectors/_category_.json | 2 +- .../additional-built-in-plugins.mdx | 2 +- docs/connectors/behavior.mdx | 2 +- .../building-connectors/_category_.json | 2 +- .../building-connectors/conduit-sdk.mdx | 29 +-- .../developing-destination-connectors.mdx | 57 +++-- .../developing-source-connectors.mdx | 108 ++++++--- .../configuration-parameters/_category_.json | 4 + .../configuration-parameters/batching.mdx | 124 +++++++++++ .../configuration-parameters.mdx | 10 + .../output-format.mdx} | 11 +- .../schema-extraction.mdx | 92 ++++++++ docs/connectors/getting-started.mdx | 2 +- docs/connectors/referencing.mdx | 2 +- docs/features/opencdc-record.mdx | 208 +++++++++++++++--- docs/features/schema-support.mdx | 89 ++++++++ docs/features/storage.mdx | 25 ++- docs/features/stream-inspector.mdx | 2 +- docs/features/web-ui.mdx | 2 +- docs/getting-started/architecture.mdx | 145 +++++++----- .../installing-and-running.mdx | 14 +- docs/introduction.mdx | 6 +- docs/processors/builtin/avro.decode.mdx | 107 +++------ docs/processors/builtin/avro.encode.mdx | 112 +++------- docs/processors/builtin/base64.decode.mdx | 48 ++++ docs/processors/builtin/base64.encode.mdx | 48 ++++ docs/processors/builtin/custom.javascript.mdx | 48 ++++ docs/processors/builtin/error.mdx | 48 ++++ docs/processors/builtin/field.convert.mdx | 117 +++++++++- docs/processors/builtin/field.exclude.mdx | 48 ++++ docs/processors/builtin/field.rename.mdx | 48 ++++ docs/processors/builtin/field.set.mdx | 48 ++++ docs/processors/builtin/filter.mdx | 59 ++++- docs/processors/builtin/json.decode.mdx | 48 ++++ docs/processors/builtin/json.encode.mdx | 48 ++++ docs/processors/builtin/unwrap.debezium.mdx | 48 ++++ .../builtin/unwrap.kafkaconnect.mdx | 48 ++++ docs/processors/builtin/unwrap.opencdc.mdx | 48 ++++ docs/processors/builtin/webhook.http.mdx | 48 ++++ docs/processors/standalone/building.mdx | 121 ++++++++++ docs/processors/standalone/how-it-works.mdx | 33 ++- docusaurus.config.ts | 8 +- src/processorgen/specs/avro.decode.json | 60 ++--- src/processorgen/specs/avro.encode.json | 63 ++---- src/processorgen/specs/base64.decode.json | 24 ++ src/processorgen/specs/base64.encode.json | 24 ++ src/processorgen/specs/custom.javascript.json | 24 ++ src/processorgen/specs/error.json | 24 ++ src/processorgen/specs/field.convert.json | 64 +++++- src/processorgen/specs/field.exclude.json | 24 ++ src/processorgen/specs/field.rename.json | 24 ++ src/processorgen/specs/field.set.json | 24 ++ src/processorgen/specs/filter.json | 27 ++- src/processorgen/specs/json.decode.json | 24 ++ src/processorgen/specs/json.encode.json | 24 ++ src/processorgen/specs/unwrap.debezium.json | 24 ++ .../specs/unwrap.kafkaconnect.json | 24 ++ src/processorgen/specs/unwrap.opencdc.json | 24 ++ src/processorgen/specs/webhook.http.json | 24 ++ static/img/conduit/conduit-architecture.svg | 1 + static/img/conduit/conduit-diagram.svg | 1 - 61 files changed, 2205 insertions(+), 442 deletions(-) create mode 100644 docs/connectors/configuration-parameters/_category_.json create mode 100644 docs/connectors/configuration-parameters/batching.mdx create mode 100644 docs/connectors/configuration-parameters/configuration-parameters.mdx rename docs/connectors/{output-formats.mdx => configuration-parameters/output-format.mdx} (95%) create mode 100644 docs/connectors/configuration-parameters/schema-extraction.mdx create mode 100644 docs/features/schema-support.mdx create mode 100644 static/img/conduit/conduit-architecture.svg delete mode 100644 static/img/conduit/conduit-diagram.svg diff --git a/docs/connectors/_category_.json b/docs/connectors/_category_.json index 655790dc..dd452d3f 100644 --- a/docs/connectors/_category_.json +++ b/docs/connectors/_category_.json @@ -1,4 +1,4 @@ { "label": "Connectors", - "position": 3 + "position": 2 } diff --git a/docs/connectors/additional-built-in-plugins.mdx b/docs/connectors/additional-built-in-plugins.mdx index 7ed5a2b5..0cc14f23 100644 --- a/docs/connectors/additional-built-in-plugins.mdx +++ b/docs/connectors/additional-built-in-plugins.mdx @@ -1,6 +1,6 @@ --- title: "Adding built-in Connectors" -sidebar_position: 2 +sidebar_position: 4 --- Built-in connectors offer better performance when compared to standalone ones, diff --git a/docs/connectors/behavior.mdx b/docs/connectors/behavior.mdx index f7e39d21..c4156c82 100644 --- a/docs/connectors/behavior.mdx +++ b/docs/connectors/behavior.mdx @@ -1,6 +1,6 @@ --- title: "Connector Behavior" -sidebar_position: 4 +sidebar_position: 5 --- This document provides insights on how Conduit communicates with a connector. diff --git a/docs/connectors/building-connectors/_category_.json b/docs/connectors/building-connectors/_category_.json index 3beab358..8802d1ff 100644 --- a/docs/connectors/building-connectors/_category_.json +++ b/docs/connectors/building-connectors/_category_.json @@ -1,5 +1,5 @@ { "label": "Building Connectors", - "position": 5, + "position": 4, "collapsed": true } diff --git a/docs/connectors/building-connectors/conduit-sdk.mdx b/docs/connectors/building-connectors/conduit-sdk.mdx index 5b493f46..6948fb7b 100644 --- a/docs/connectors/building-connectors/conduit-sdk.mdx +++ b/docs/connectors/building-connectors/conduit-sdk.mdx @@ -6,35 +6,14 @@ sidebar_position: 0 Conduit connectors can be built in any programming language that supports gRPC. To make it easier to write connectors we provide a [Connector SDK](https://github.com/ConduitIO/conduit-connector-sdk) written in -Go. Using the SDK is the recommended way of writing a Conduit connector. - -## Supported data types - -There are no limitations when it comes to data types a source connector can read -from a source. However, if a standalone source connector uses `record.StructuredData` -in its key or any part of the payload, then there are certain limitations in the -data types it can send to Conduit. - -The following data types are supported: - -* `bool` -* `int`, `int32`, `int64`, `uint`, `uint32`, `uint64` -* `float32`, `float64` -* `string` -* `[]byte` (stored as a string, base64-encoded) -* `map[string]interface{}` (a map of strings to any of the values that are supported) -* `[]interface{}` (a slice of any value that is supported) - -A notable limitation is timestamps, i.e. `time.Time` values are not supported. - -One way to support other values is to encode source data to a `[]byte` (e.g. using -a JSON encoding) and then store the value as `record.RawData`. - +Go. Using the SDK is the recommended way of writing a Conduit connector. If you +are implementing a connector in a different language you should refer to +the [Connector Protocol](https://github.com/ConduitIO/conduit-connector-protocol). ## Conduit connector template The easiest way to start implementing your own Conduit connector is by using the -[Conduit connector template](https://github.com/ConduitIO/conduit-connector-template). +[Conduit connector template](/docs/connectors/building-connectors/conduit-connector-template). It contains the basic project structure as well as some additional utilities like GitHub actions and a Makefile. diff --git a/docs/connectors/building-connectors/developing-destination-connectors.mdx b/docs/connectors/building-connectors/developing-destination-connectors.mdx index c70543fd..9c96e3ef 100644 --- a/docs/connectors/building-connectors/developing-destination-connectors.mdx +++ b/docs/connectors/building-connectors/developing-destination-connectors.mdx @@ -5,13 +5,11 @@ sidebar_position: 4 A Destination is responsible for writing [Record](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#Record) to third party systems. -You need to implement the functions required by Destination and provide your own implementations. Information about individual functions are listed below. +You need to implement the functions required by Destination and provide your own implementations. Information about individual functions are listed below. The **`destination.go`** file is the main file where the functionality of your Destination Connector is implemented. -## `destination.go` +## `Destination struct` -This file provides the main functionality of your Destination Connector. - -- **`Destination`** Struct: Every Destination implementation needs to include an [UnimplementedDestination](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedDestination) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Destination implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector. +Every Destination implementation needs to include an [UnimplementedDestination](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedDestination) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Destination implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector. ```go type Destination struct { @@ -21,16 +19,29 @@ This file provides the main functionality of your Destination Connector. } ``` -- **`NewDestination()`**: A constructor function for your Destination struct. Note that this is the same function that should be set as the value of `Connector.NewDestination`. The constructor should be used to wrap your Destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware. +## Destination Connector Lifecycle Functions - ```go - func NewDestination() sdk.Destination { - // Create Destination and wrap it in the default middleware. - return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) - } - ``` +### `NewDestination()` + +A constructor function for your Destination struct. Note that this is the same function that should be set as the value of `Connector.NewDestination`. The constructor should be used to wrap your Destination in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware. + +```go +func NewDestination() sdk.Destination { + // Create Destination and wrap it in the default middleware. + return sdk.DestinationWithMiddleware( + &Destination{}, + sdk.DefaultDestinationMiddleware()..., + ) +} +``` + +**Additional options via `DestinationMiddlewareOption`**: -- **`Parameters()`**: A map of named Parameters that describe how to configure the connector. This map is typically generated using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). +Currently, the available destination middleware options can be found [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/destination_middleware.go#L44-L50). + +### `Parameters()` + +A map of named Parameters that describe how to configure the connector. This map is typically generated using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). ```go func (d *Destination) Parameters() map[string]sdk.Parameter { @@ -38,7 +49,9 @@ This file provides the main functionality of your Destination Connector. } ``` -- **`Configure()`**: Validates and stores configuration data for the connector. Any complex validation logic should be implemented here. +### `Configure()` + +Validates and stores configuration data for the connector. Any complex validation logic should be implemented here. ```go func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { @@ -51,7 +64,9 @@ This file provides the main functionality of your Destination Connector. } ``` -- **`Open()`**: Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function. +### `Open()` + +Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function. ```go func (d *Destination) Open(ctx context.Context) error { @@ -75,7 +90,9 @@ This file provides the main functionality of your Destination Connector. } ``` -- **`Write()`**: Writes len(records) from a slice of `sdk.Record` objects received from the Conduit pipeline to the destination right away without caching. It should return the number of records written from the slice and any error encountered that caused the write to stop early. +### `Write()` + +Writes len(records) from a slice of `sdk.Record` objects received from the Conduit pipeline to the destination right away without caching. It should return the number of records written from the slice and any error encountered that caused the write to stop early. ```go func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error) { @@ -97,7 +114,9 @@ This file provides the main functionality of your Destination Connector. } ``` -- **`Ack()`**: Ack signals to the implementation that the record with the supplied position was successfully processed. +### `Ack()` + +Ack signals to the implementation that the record with the supplied position was successfully processed. ```go func (d *Destination) Ack(ctx context.Context, position sdk.Position) error { @@ -106,7 +125,9 @@ This file provides the main functionality of your Destination Connector. } ``` -- **`Teardown()`**: Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here. +### `Teardown()` + +Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here. ```go func (d *Destination) Teardown(ctx context.Context) error { diff --git a/docs/connectors/building-connectors/developing-source-connectors.mdx b/docs/connectors/building-connectors/developing-source-connectors.mdx index f69e4d78..868f19f6 100644 --- a/docs/connectors/building-connectors/developing-source-connectors.mdx +++ b/docs/connectors/building-connectors/developing-source-connectors.mdx @@ -5,15 +5,11 @@ sidebar_position: 3 A Source is responsible for continuously reading data from a third party system and returning it in the form of an [SDK Record](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#Record). -You need to implement the functions required by the Source interface and provide your own implementations. Information about individual functions are listed below. +You need to implement the functions required by the Source interface and provide your own implementations. Information about individual functions are listed below. The **`source.go`** file is the main file where the functionality of your Source Connector is implemented. -## `source.go` +## `Source struct` -This file provides the main functionality of your Source Connector. - -### Source Connector Lifecycle Functions - -- **`Source`** Struct: Every Source implementation needs to include an [UnimplementedSource](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedSource) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Source implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector +Every Source implementation needs to include an [UnimplementedSource](https://pkg.go.dev/github.com/conduitio/conduit-connector-sdk#UnimplementedSource) to satisfy the interface. This allows us to potentially change the interface in the future while remaining backward compatible with existing Source implementations. This struct can be modified to add additional fields that can be accessed throughout the lifecycle of the Connector ```go type Source struct { @@ -27,38 +23,72 @@ This file provides the main functionality of your Source Connector. } ``` -- **`NewSource()`**: A constructor function for your Source struct. Note that this is the same function that should be set as the value of `Connector.NewSource`. The constructor should be used to wrap your Source in the default middleware. You can add additional middleware, but unless you have a very good reason, you should always include the default middleware. +## Source Connector Lifecycle Functions - ```go - func NewSource() sdk.Source { - // Create Source and wrap it in the default middleware. - return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) - } - ``` +### `NewSource()` -- **`Parameters()`**: A map of named Parameters that describe how to configure the connector. This map is typically generates using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). +A constructor function for your Source struct. Note that this is the same function that should be set as the value of `Connector.NewSource`. The constructor should be used to wrap your Source in the default `DefaultSourceMiddleware`. - ```go - func (s *Source) Parameters() map[string]sdk.Parameter { - return s.config.Parameters() - } - ``` +```go +func NewSource() sdk.Source { + // Create Source and wrap it in the default middleware. + return sdk.SourceWithMiddleware( + &Source{}, + sdk.DefaultSourceMiddleware()..., + ) +} +``` -- **`Configure()`**: Validates and stores configuration data for the connector. Any complex validation logic should be implemented here. +**Additional options via `SourceMiddlewareOption`**: - ```go - func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { - sdk.Logger(ctx).Info().Msg("Configuring Source...") - err := sdk.Util.ParseConfig(cfg, &s.config) - if err != nil { - return fmt.Errorf("invalid config: %w", err) - } - // add custom validations here - return nil - } - ``` +In case you need to add additional middleware options, you can do so by passing it to the `sdk.SourceWithMiddleware` function via `sdk.DefaultSourceMiddleware(opts ...SourceMiddlewareOption)`. Currently, the available source middleware options can be cound [here](https://github.com/ConduitIO/conduit-connector-sdk/blob/1cbe778fabc8f903e075872560e6a91049d2e978/source_middleware.go#L42-L46). -- **`Open()`**: Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function. +:::note +If you're using a source connector that's not generating structured data (i.e. produces raw data), you might want to disable schema extraction by default by overwriting the `sdk.SourceWithSchemaExtractionConfig` options: + +```go +sdk.SourceWithMiddleware( + &Source{}, + sdk.DefaultSourceMiddleware( + // disable schema extraction by default, because the source produces raw data + sdk.SourceWithSchemaExtractionConfig{ + PayloadEnabled: lang.Ptr(false), + KeyEnabled: lang.Ptr(false), + }, + )..., + ) +``` +::: + +### `Parameters()` + +A map of named Parameters that describe how to configure the connector. This map is typically generates using [`paramgen`](https://github.com/ConduitIO/conduit-commons/tree/main/paramgen). + +```go +func (s *Source) Parameters() map[string]sdk.Parameter { + return s.config.Parameters() +} +``` + +### `Configure()` + +Validates and stores configuration data for the connector. Any complex validation logic should be implemented here. + + ```go + func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { + sdk.Logger(ctx).Info().Msg("Configuring Source...") + err := sdk.Util.ParseConfig(cfg, &s.config) + if err != nil { + return fmt.Errorf("invalid config: %w", err) + } + // add custom validations here + return nil + } + ``` + +### `Open()` + +Prepares the connector to start producing records based on the last known successful position. If needed, the connector should open connections in this function. ```go func (s *Source) Open(ctx context.Context, pos sdk.Position) error { @@ -83,7 +113,9 @@ This file provides the main functionality of your Source Connector. } ``` -- **`Read()`**: Gathers data from the configured data source and formats it into a `sdk.Record` that is returned from the function. The returned `sdk.Record` is queued into the pipeline to be consumed by a Destination connector. +### `Read()` + +Gathers data from the configured data source and formats it into a `sdk.Record` that is returned from the function. The returned `sdk.Record` is queued into the pipeline to be consumed by a Destination connector. ```go func (s *Source) Read(ctx context.Context) (sdk.Record, error) { @@ -136,7 +168,9 @@ This file provides the main functionality of your Source Connector. } ``` -- **`Ack()`**: Ack signals to the implementation that the record with the supplied position was successfully processed. +### `Ack()` + +Ack signals to the implementation that the record with the supplied position was successfully processed. ```go func (s *Source) Ack(ctx context.Context, position sdk.Position) error { @@ -145,7 +179,9 @@ This file provides the main functionality of your Source Connector. } ``` -- **`Teardown()`**: Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here. +### `Teardown()` + +Teardown signals to the connector that there will be no more calls to any other function. Any connections that were created in the `Open()` function should be closed here. ```go func (s *Source) Teardown(ctx context.Context) error { diff --git a/docs/connectors/configuration-parameters/_category_.json b/docs/connectors/configuration-parameters/_category_.json new file mode 100644 index 00000000..74e4493a --- /dev/null +++ b/docs/connectors/configuration-parameters/_category_.json @@ -0,0 +1,4 @@ +{ + "label": "Configuration parameters", + "position": 3 +} diff --git a/docs/connectors/configuration-parameters/batching.mdx b/docs/connectors/configuration-parameters/batching.mdx new file mode 100644 index 00000000..00a42e1b --- /dev/null +++ b/docs/connectors/configuration-parameters/batching.mdx @@ -0,0 +1,124 @@ +--- +title: "Batching" +sidebar_position: 0 +--- + +Destination connectors can be configured to process records in batches. This is especially useful when the destination +resource can receive multiple records in a single round-trip. By default, Conduit processes records one by one. + +Enabling batch processing can improve the performance of the connector, as it reduces the number of round-trips to the +destination resource. However, it can also increase the memory usage of the connector, as it needs to store multiple records +in memory before flushing the batch. It can also increase the latency of the connector, as it needs to wait for the batch +to be full. + +## Configuration parameters + +There are two connector configuration parameters which control the batch size: + +* `sdk.batch.size`: used to configure the number of records to be sent in a single batch. Default value is `1`. +* `sdk.batch.delay`: used to configure the maximum time to wait for a batch to be full before sending it to the destination resource. Default value is `0`. + +## Examples + +### Example 1: Batch size + +The following pipeline is configured to process batches of 100 records when writing to the destination resource. Note that +the source connector is generating records at a rate of 10 records per second, meaning that records will be flushed +approximately every 10 seconds. + +```yaml +version: 2.2 + +pipelines: + - id: pipeline1 + status: running + name: pipeline1 + description: 'A pipeline batching 100 records at a time.' + connectors: + - id: source1 + type: source + plugin: builtin:generator + name: source1 + settings: + rate: 10 + operations: "create" + format.type: "structured" + format.options.name: "string" + format.options.company: "string" + - id: destination1 + type: destination + plugin: "builtin:file" + name: destination1 + settings: + sdk.batch.size: 100 + path: /tmp/file-destination.txt +``` + +### Example 2: Batch delay + +The following pipeline is configured to collect records for 5 seconds before flushing the batch to the destination resource. +Note that the source connector is generating records at a rate of 10 records per second, meaning that a batch will +contain approximately 50 records. + +```yaml +version: 2.2 + +pipelines: + - id: pipeline1 + status: running + name: pipeline1 + description: 'A pipeline batching 100 records at a time.' + connectors: + - id: source1 + type: source + plugin: builtin:generator + name: source1 + settings: + rate: 10 + operations: "create" + format.type: "structured" + format.options.name: "string" + format.options.company: "string" + - id: destination1 + type: destination + plugin: "builtin:file" + name: destination1 + settings: + sdk.batch.delay: "5s" + path: /tmp/file-destination.txt +``` + +### Example 3: Batch size and delay + +The following pipeline is configured to collect batches of 100 records for up to 5 seconds before flushing them to the +destination resource. This means that records will be flushed at most every 5 seconds, or sooner if the batch collects +100 records. + +```yaml +version: 2.2 + +pipelines: + - id: pipeline1 + status: running + name: pipeline1 + description: 'A pipeline batching 100 records at a time.' + connectors: + - id: source1 + type: source + plugin: builtin:generator + name: source1 + settings: + rate: 10 + operations: "create" + format.type: "structured" + format.options.name: "string" + format.options.company: "string" + - id: destination1 + type: destination + plugin: "builtin:file" + name: destination1 + settings: + sdk.batch.size: 100 + sdk.batch.delay: "5s" + path: /tmp/file-destination.txt +``` diff --git a/docs/connectors/configuration-parameters/configuration-parameters.mdx b/docs/connectors/configuration-parameters/configuration-parameters.mdx new file mode 100644 index 00000000..7131119f --- /dev/null +++ b/docs/connectors/configuration-parameters/configuration-parameters.mdx @@ -0,0 +1,10 @@ +--- +title: 'Configuration Parameters' +sidebar_position: 0 +--- + +import DocCardList from '@theme/DocCardList'; + +When configuring a connector, you can use the following parameters: + + \ No newline at end of file diff --git a/docs/connectors/output-formats.mdx b/docs/connectors/configuration-parameters/output-format.mdx similarity index 95% rename from docs/connectors/output-formats.mdx rename to docs/connectors/configuration-parameters/output-format.mdx index 02a7023a..67092e78 100644 --- a/docs/connectors/output-formats.mdx +++ b/docs/connectors/configuration-parameters/output-format.mdx @@ -1,13 +1,12 @@ --- -title: "Output Formats" -sidebar_position: 5 +title: "Output Format" +sidebar_position: 1 --- One of the challenges to be solved when integrating Conduit with other systems, such as Kafka Connect and Debezium, is the data format. This is present in situations where raw data needs to be written, for example when writing messages to -Kafka. By default, Conduit uses -the [OpenCDC format](/docs/features/opencdc-record). Conduit -also makes it possible to change the output format so that the data can be consumed by other systems. +Kafka. By default, Conduit uses the [OpenCDC format](/docs/features/opencdc-record). Conduit also makes it possible to +change the output format so that the data can be consumed by other systems. :::note @@ -80,7 +79,6 @@ that the file destination outputs the records in the Debezium format. Additional be `employee` by setting the format options to `debezium.schema.name=employee`. ```yaml ---- version: 2.2 pipelines: @@ -144,7 +142,6 @@ a `name` field and a `company` field. In this pipeline, we use the `template` re `id` field and then prefix it with `employee ID`. ```yaml ---- version: 2.2 pipelines: diff --git a/docs/connectors/configuration-parameters/schema-extraction.mdx b/docs/connectors/configuration-parameters/schema-extraction.mdx new file mode 100644 index 00000000..7a92c69e --- /dev/null +++ b/docs/connectors/configuration-parameters/schema-extraction.mdx @@ -0,0 +1,92 @@ +--- +title: "Schema Extraction" +sidebar_position: 2 +--- + +Source and destination connectors can be configured to automatically extract the +schema from the key and payload of a record. This is especially useful when the +data is structured and the schema is known in advance. By default, Conduit +extracts the schema from the key and the payload of a record and encodes them +using the extracted schema. + +## Configuration parameters + +These are the configuration parameters that control schema extraction on a +source connector (Note that `sdk.schema.extract.payload.enabled` +and `sdk.schema.extract.key.enabled` are also available on destination +connectors): + +* `sdk.schema.extract.type`: The type of schema extraction to perform. Supported + value is `avro`. +* `sdk.schema.extract.payload.enabled`: A boolean value that indicates whether + the payload should be extracted. +* `sdk.schema.extract.payload.subject`: The subject of the payload schema. +* `sdk.schema.extract.key.enabled`: A boolean value that indicates whether the + key should be extracted. +* `sdk.schema.extract.key.subject`: The subject of the key schema. + +## Example + +The below pipeline will generate a single record and write it to a file. Notice +that it's configured so that the generator source does not extract the schema or +encode the data. + +```yaml +version: "2.2" +pipelines: + - id: generator-to-file + status: running + name: generator-to-file + description: Generates a single record, no schema generated, writes to file + connectors: + - id: file-src + type: source + plugin: builtin:generator + name: file-src + settings: + recordCount: "1" + collections.users.format.type: structured + collections.users.format.options.id: int + collections.users.format.options.name: string + + sdk.schema.extract.payload.enabled: false + sdk.schema.extract.key.enabled: false + + - id: file-dest + type: destination + plugin: builtin:file + name: file-dest + settings: + path: /tmp/file-destination.txt +``` + +When the pipeline is run, `/tmp/file-destination.txt` will contain output similar to this: + +```json +{ + "position": "MQ==", + "operation": "create", + "metadata": { + "conduit.source.connector.id": "generator-to-file:file-src", + "opencdc.collection": "users", + "opencdc.createdAt": "1723046776830339829" + }, + "key": "c2F1cm9wc2lkYW4=", + "payload": { + "before": null, + "after": { + "id": 7819649577989235000, + "name": "Iambe" + } + } +} +``` + +Notice that the written record doesn't contain any schema information in its +metadata. However, if you leave the schema extraction enabled, then you'll see +something below in the record's metadata: + +``` +"opencdc.payload.schema.subject": "generator-to-file:file-src:users.payload", +"opencdc.payload.schema.version": "1" +``` diff --git a/docs/connectors/getting-started.mdx b/docs/connectors/getting-started.mdx index cdf58893..81812aaa 100644 --- a/docs/connectors/getting-started.mdx +++ b/docs/connectors/getting-started.mdx @@ -4,7 +4,7 @@ sidebar_position: 0 sidebar_label: "Getting Started" --- -A connector knows how to read/write records from/to a data source/destination +A Conduit Connector knows how to read/write records from/to a data source/destination (e.g. a database). When thinking about connectors for Conduit, our goals were to: diff --git a/docs/connectors/referencing.mdx b/docs/connectors/referencing.mdx index a07bdc22..fa116510 100644 --- a/docs/connectors/referencing.mdx +++ b/docs/connectors/referencing.mdx @@ -1,6 +1,6 @@ --- title: "Referencing Connectors" -sidebar_position: 3 +sidebar_position: 2 --- The name, used to reference a connector plugin in API requests or a pipeline diff --git a/docs/features/opencdc-record.mdx b/docs/features/opencdc-record.mdx index 7c62d0cc..5d9d6a37 100644 --- a/docs/features/opencdc-record.mdx +++ b/docs/features/opencdc-record.mdx @@ -3,32 +3,56 @@ title: 'OpenCDC record' sidebar_position: 4 --- -An OpenCDC record in Conduit aims to standardize the format of data records exchanged between different connectors within a data processing pipeline. The primary objective is to ensure compatibility between various combinations of source and destination connectors. +An OpenCDC record in Conduit aims to standardize the format of data records +exchanged between different connectors within a data processing pipeline. The +primary objective is to ensure compatibility between various combinations of +source and destination connectors. ## Benefits -1. **Support for Operations**: The format should support representing records for `create`, `update`, `delete`, and `snapshot` operations. -2. **Standard Metadata Fields**: Definition of standard metadata fields to provide essential information about each record. These can vary depending on the record. See [Metadata Fields](#metadata-fields) for more information. -3. **Integration with Data Tools**: We believe that being strict about the record format Conduit consumes and produces will make it easier to integrate with other data processing tools. +1. **Support for Operations**: The format should support representing records + for `create`, `update`, `delete`, and `snapshot` operations. +2. **Standard Metadata Fields**: Definition of standard metadata fields to + provide essential information about each record. These can vary depending on + the record. See [Metadata Fields](#metadata-fields) for more information. +3. **Integration with Data Tools**: We believe that being strict about the + record format Conduit consumes and produces will make it easier to integrate + with other data processing tools. ## Fields -* `.Position` uniquely represents the position of record. This is used to track the position of a record in a source connector, enabling Conduit to resume a stopped pipeline. -* `.Operation` defines what triggered the creation of a record. There are four possibilities: `create`, `update`, `delete` or `snapshot`. The first three operations are encountered during normal CDC operation, while `snapshot` is meant to represent records during an initial load. Depending on the operation, the record will contain either the payload before the change, after the change, both or none (see fields `.Payload.Before` and `.Payload.After`). +* `.Position` uniquely represents the position of record. This is used to track + the position of a record in a source connector, enabling Conduit to resume a + stopped pipeline. +* `.Operation` defines what triggered the creation of a record. There are four + possibilities: `create`, `update`, `delete` or `snapshot`. The first three + operations are encountered during normal CDC operation, while `snapshot` is + meant to represent records during an initial load. Depending on the operation, + the record will contain either the payload before the change, after the + change, both or none (see fields `.Payload.Before` and `.Payload.After`). * `.Key` represents a value that should identify the entity (e.g. database row). * `.Metadata` contains additional information regarding the record. -* `.Payload.Before` holds the payload before the operation ocurred. These could be present in operations such as `update` and `delete`. -* `.Payload.After` holds the payload after the operation ocurred. These could be present in operations such as `create`, `snapshot` or `update`. +* `.Payload.Before` holds the payload before the operation ocurred. These could + be present in operations such as `update` and `delete`. +* `.Payload.After` holds the payload after the operation ocurred. These could be + present in operations such as `create`, `snapshot` or `update`. :::note -We're indicating `.Position`, and not `.position` as defined in its [`Record` message](https://buf.build/conduitio/conduit-commons/docs/main:opencdc.v1#opencdc.v1.Record), to show its [Go template](https://pkg.go.dev/text/template) notation as used by the [Go representation of an OpenCDC record](https://github.com/ConduitIO/conduit-commons/blob/main/opencdc/record.go#L32). This field is public and must start with an uppercase letter. +We're indicating `.Position`, and not `.position` as defined in +its [`Record` message](https://buf.build/conduitio/conduit-commons/docs/main:opencdc.v1#opencdc.v1.Record), +to show its [Go template](https://pkg.go.dev/text/template) notation as used by +the [Go representation of an OpenCDC record](https://github.com/ConduitIO/conduit-commons/blob/main/opencdc/record.go#L32). +This field is public and must start with an uppercase letter. ::: ## Representation -Conduit relies on [Protocol Buffers (protobuf)](https://protobuf.dev/) when it comes to defining an OpenCDC record to benefit from the several advantages that it provides. Its definition can be found in the [Buf Schema Registry](https://buf.build/conduitio/conduit-commons/docs/main:opencdc.v1). +Conduit uses [Protocol Buffers (protobuf)](https://protobuf.dev/) to define an +OpenCDC record. Its definition can be found in +the [Buf Schema Registry](https://buf.build/conduitio/conduit-commons/docs/main:opencdc.v1). -When processing records in Conduit, you can always expect a similar structure to the following: +When processing records in Conduit, you can always expect a similar structure to +the following: ```json { @@ -56,9 +80,80 @@ When processing records in Conduit, you can always expect a similar structure to ``` :::note -`.Position`, `.Key`, and `.Payload.Before` are represented as `Base64` encoded in the example above because these will be a byte slice when represented as JSON. +`.Position`, `.Key`, and `.Payload.Before` are represented as `Base64` encoded +in the example above because these will be a byte slice when represented as +JSON. ::: +## Data types +There are no limitations when it comes to data types a source connector can read +from a source. Certain type limitations might apply depending on how the data is +moved internally in Conduit, from source connector to Conduit and from Conduit +to destination connectors. + +The key and the payload data can be represented in two ways: raw and structured. + +## Raw data + +Raw data in Conduit is an array of bytes that can represent the source data in +its original form (such as a file) or encoded in a certain way (for example, a +JSON representation of a database row or an Avro encoded structure). It's +represented by the `opencdc.RawData` Go type. For example, when writing a +connector or a processor in Go, you can construct raw data like this: + +```go +opencdc.RawData([]byte{1, 3, 5}) +``` + +## Structured data + +Structured data in Conduit is a map in which the keys are field names and values +are field values. An example of that is the below record's `.Payload.After` +field: +```json +{ + // other record fields + "payload": { + "before": "eWVsbG93", + "after": { + "bool_field": true, + "float_field": 1.2, + "int_field": 1, + "string_field": "orange" + } + } +} +``` + +When writing a connector or a processor in Go, it's represented by +the `opencdc.StructuredData` type. + +The supported data types for values in `opencdc.StructuredData` depend on following: +- connector or processor type (built-in or standalone) +- [schema support](/docs/features/schema-support) (enabled or disabled). + +In built-in connectors, the field values can be of any Go type, given that +there's no (de)serialization involved. + +In standalone connectors with schema middleware enabled (which is the default), +any type that is supported by the [Apache Avro™ format](https://avro.apache.org) +is also supported by Conduit. + +If the schema middleware is disabled in a connector, then the supported types +are limited to what Protobuf allows as +a [value](https://protobuf.dev/reference/protobuf/google.protobuf/#value). That +translates to the following Go types: + +* `bool` +* `int`, `int32`, `int64`, `uint`, `uint32`, `uint64` +* `float32`, `float64` +* `string` +* `[]byte` (stored as a string, base64-encoded) +* `map[string]interface{}` (a map of strings to any of the values that are supported) +* `[]interface{}` (a slice of any value that is supported) + +A notable limitation is timestamps, i.e. `time.Time` values are not supported. + ## Metadata fields As part of an OpenCDC record, there will be a set of fields provided that will @@ -70,12 +165,29 @@ expand its functionality. Notice that all these fields use a dot notation syntax to indicate what they refer to, preventing accidental clashes. Here are the ones you can find: +### `opencdc.version` + +Contains the version of the OpenCDC format (e.g., "v1"). This field exists to +ensure the OpenCDC format version can be easily identified in case the record +gets marshaled into a different untyped format (e.g. JSON). + +```json +{ + // other record fields + "metadata": { + "opencdc.version": "v1", + // rest of metadata + }, + // other record fields +} +``` + ### `opencdc.createdAt` Contains the time when the record was created in the 3rd party system. The expected format is a Unix timestamp in nanoseconds. -```json5 +```json { // other record fields "metadata": { @@ -91,7 +203,7 @@ expected format is a Unix timestamp in nanoseconds. Contains the time when the record was read from the 3rd party system. The expected format is a Unix timestamp in nanoseconds. -```json5 +```json { // other record fields "metadata": { @@ -102,49 +214,77 @@ expected format is a Unix timestamp in nanoseconds. } ``` -### `opencdc.version` +### `opencdc.collection` -Contains the version of the OpenCDC format (e.g., "v1"). This field exists to -ensure the OpenCDC format version can be easily identified in case the record -gets marshaled into a different untyped format (e.g. JSON). +Contains the name of the collection from which the record originated and/or +where it should be written to. -```json5 +:::note +It's up to the connector to populate this field. In other words, not all records +may have this field. +::: + +```json { // other record fields "metadata": { - "opencdc.version": "v1", + "opencdc.collection": "employees", // rest of metadata }, // other record fields } ``` -### `opencdc.collection` +### `opencdc.key.schema.*` -Contains the name of the collection from which the record originated and/or -where it should be written to. +#### `opencdc.key.schema.subject`, `opencdc.key.schema.version` + +Contains the subject and version of the schema for the records' `.Key` field. :::note -It's up to the connector to populate this field. In other words, not all records -may have this field. +This field will only be populated when using structured data. ::: -```json5 +```json { // other record fields "metadata": { - "opencdc.collection": "employees", + "opencdc.key.schema.subject": "employees.key.v1", + "opencdc.key.schema.version": "1", // rest of metadata }, // other record fields } ``` + +### `opencdc.payload.schema.*` + +#### `opencdc.payload.schema.subject`, `opencdc.payload.schema.version` + +Contains the subject and version of the schema for the records' `.Payload.After` and `.Paylod.Before` fields. + +:::note +This field will only be populated when using structured data. +::: + +```json +{ + // other record fields + "metadata": { + "opencdc.payload.schema.subject": "connector-id:collection.payload", + "opencdc.payload.schema.version": "1", + // rest of metadata + }, + // other record fields +} +``` + ### `conduit.source.plugin.name` The name of the source plugin that created the record. -```json5 +```json { // other record fields "metadata": { @@ -160,7 +300,7 @@ The name of the source plugin that created the record. The version of the source plugin that created the record. -```json5 +```json { // other record fields "metadata": { @@ -175,7 +315,7 @@ The version of the source plugin that created the record. `conduit.source.connector.id` is the ID of the source connector that received the record. -```json5 +```json { // other record fields "metadata": { @@ -189,7 +329,7 @@ The version of the source plugin that created the record. The name of the destination plugin that has written the record. -```json5 +```json { // other record fields "metadata": { @@ -205,7 +345,7 @@ The name of the destination plugin that has written the record. The version of the destination plugin that has written the record. -```json5 +```json { // other record fields "metadata": { @@ -217,7 +357,7 @@ The version of the destination plugin that has written the record. ``` ### `conduit.dlq.nack.error` -Contains the error that caused a record to be nacked and pushed to the [dead-letter queue (DLQ)](/dead-letter-queue). +Contains the error that caused a record to be nacked and pushed to the [dead-letter queue (DLQ)](/docs/features/dead-letter-queue). ### `conduit.dlq.nack.node.id` The ID of the internal node that nacked the record. @@ -233,7 +373,7 @@ Taking the same [previous record example](#representation), you'll notice there is a metadata key named `file.path`, which would indicate this field was added by a `file` plugin. -```json5 +```json { // other record fields "metadata": { diff --git a/docs/features/schema-support.mdx b/docs/features/schema-support.mdx new file mode 100644 index 00000000..b44bc065 --- /dev/null +++ b/docs/features/schema-support.mdx @@ -0,0 +1,89 @@ +--- +title: 'Schema Support' +sidebar_position: 7 +keywords: [ 'schema', 'avro', 'confluent' ] +--- + +## Overview + +Conduit can manage the structure and format of data as it moves through the +pipeline. This makes it possible to take advantage of the benefits that the +associated type information provides, such as: + +- **Data Integrity**: Ensuring that data adheres to the expected structure, reducing + the risk of errors and inconsistencies. +- **Type Safety**: Retaining type information throughout the data pipeline, allowing + for safe and accurate data processing. +- **Future-Proofing**: Preparing the system to handle evolving data structures, + making it easier to adapt to changes without significant disruptions. + +Additionally, this solves the problem of transferring type information in +standalone connectors. Namely, Conduit and standalone connectors communicate via +Protobuf messages that have a limited set of types. + +Every [record](/docs/features/opencdc-record) that contains structured +data can be associated with a schema. +The [Apache Avro™](https://avro.apache.org/) format is supported and support for +more schema types is planned. + +Since both, a record's key and payload, can be structured, schemas can be +associated with either. Schemas are not part of a record for performance +reasons. Instead, a record's metadata contains information about +the [key schema](/docs/features/opencdc-record#opencdckeyschema)'s subject and +version as well +the [payload schema](/docs/features/opencdc-record#opencdcpayloadschema)'s +subject and version. The schemas themselves are managed +by [the schema registry](#schema-registry). + +Conduit's [Connector SDK](https://github.com/ConduitIO/conduit-connector-sdk) and [Processor SDK](https://github.com/ConduitIO/conduit-processor-sdk) make it possible to: + +- automatically extract a schema from a record's key or payload (note that the + data has to be structured) +- automatically encode or decode a record's key or payload (i.e. connectors and + processors can work with structured data that contain correct types without + being involved in fetching the schemas and encoding/decoding the data + themselves) +- work directly with the schema registry (for example, in cases where automatic + schema extraction isn't enough and a schema needs to be built manually using + the information from a source) + +More information about how to work with schemas can be found in the relevant +pages +for [source connectors](/docs/connectors/building-connectors/developing-source-connectors), [destination connectors](/docs/connectors/building-connectors/developing-destination-connectors) +and [processors](/docs/processors/standalone/building#schemas). + +## Schema Registry + +Conduit uses a schema registry to store the schemas of records. +You can either configure Conduit to use the built-in schema registry (default) or to +connect to an external standalone service exposing a REST API that's compatible +with [Confluent's Schema registry](https://docs.confluent.io/current/schema-registry/develop/api.html). + +To use an external schema registry, you can use the following configuration +options in your [Conduit configuration file](/docs/features/configuration). + +```yaml +schema-registry: + type: confluent + confluent: + connection-string: http://localhost:8081 +``` + +Or when running Conduit from the command line you can use the following flags: + +```shell +$ ./conduit + -schema-registry.type=confluent + -schema-registry.confluent.connection-string=http://localhost:8081 +``` + +When using a **built-in schema registry**, schemas will be stored in the +same [**persistence layer**](/docs/getting-started/architecture#persistence-layer) as the rest of +Conduit's data. By default, it uses BadgerDB, but it can be configured to use +PostgreSQL, SQLite, or in-memory storage. More information +on [storage](/docs/features/storage). + + +:::note +You can check the GitHub repository for the schema registry [here](https://github.com/conduitIO/conduit-schema-registry). +::: diff --git a/docs/features/storage.mdx b/docs/features/storage.mdx index d95b353c..6ccb4095 100644 --- a/docs/features/storage.mdx +++ b/docs/features/storage.mdx @@ -1,6 +1,6 @@ --- title: 'Storage' -sidebar_position: 7 +sidebar_position: 8 --- Conduit's own data (information about pipelines, connectors, etc.) can be stored @@ -14,15 +14,26 @@ for development purposes. The database type used can be configured with the `db.type` parameter (through any of the [configuration](/docs/features/configuration) options in Conduit). -For example, the CLI flag to use a PostgresSQL database with Conduit is as + +For example, to configure a PostgresSQL database with Conduit, the CLI flag to use a is as follows: `-db.type=postgres`. Changing database parameters (e.g. the PostgreSQL connection string) is done -through parameters of the following form: `db..`. For -example, the CLI flag to use a PostgreSQL instance listening on `localhost:5432` -would be: `-db.postgres.connection-string=postgres://localhost:5432/conduitdb`. +through parameters of the following form: `db..`. + +For example, if we wanted to use a PostgreSQL instance listening on `localhost:5432` using command line flags: -The full example in our case would be: ```shell -./conduit -db.type=postgres -db.postgres.connection-string="postgresql://localhost:5432/conduitdb" +$ ./conduit + -db.type=postgres + -db.postgres.connection-string="postgresql://localhost:5432/conduitdb" +``` + +This can also be done using configuration options in your [Conduit configuration file](/docs/features/configuration): + +```yaml +db: + type: postgres + postgres: + connection-string: postgresql://localhost:5432/conduitdb ``` diff --git a/docs/features/stream-inspector.mdx b/docs/features/stream-inspector.mdx index 849c8f65..efb2dc94 100644 --- a/docs/features/stream-inspector.mdx +++ b/docs/features/stream-inspector.mdx @@ -1,6 +1,6 @@ --- title: 'Stream Inspector' -sidebar_position: 8 +sidebar_position: 9 tag: ['Stream Inspector', 'Connector'] keywords: ['stream', 'inspector'] --- diff --git a/docs/features/web-ui.mdx b/docs/features/web-ui.mdx index c2f6c587..5dbdc8f5 100644 --- a/docs/features/web-ui.mdx +++ b/docs/features/web-ui.mdx @@ -1,6 +1,6 @@ --- title: 'Web UI' -sidebar_position: 9 +sidebar_position: 10 --- Conduit comes with a web UI that allows you to build a data pipeline using a diff --git a/docs/getting-started/architecture.mdx b/docs/getting-started/architecture.mdx index ce759631..70f243ce 100644 --- a/docs/getting-started/architecture.mdx +++ b/docs/getting-started/architecture.mdx @@ -5,59 +5,102 @@ sidebar_position: 2 Here is an overview of the internal Conduit Architecture. -![Conduit Architecture](/img/conduit/conduit-diagram.svg) + +{/* Diagram source can be found at: https://lucid.app/lucidspark/414c75e1-bd3d-46f4-8baf-1b51f264ccea/edit?viewport_loc=-4169%2C-1359%2C6150%2C4089%2C0_0&invitationId=inv_9bb11c46-a33a-4119-ade6-a006a77a5f3b */} + +![Conduit Architecture](/img/conduit/conduit-architecture.svg) Conduit is split into the following layers: -* **API layer** - exposes the public APIs used to communicate with Conduit. It exposes 2 types of APIs: - * **gRPC** - this is the main API provided by Conduit. The gRPC API definition can be found in - [api.proto](https://github.com/ConduitIO/conduit/blob/main/proto/api/v1/api.proto), it can be used to generate code for the client. - * **HTTP** - the HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway) and - forwards the requests to the gRPC API. Conduit exposes an openapi definition that describes the HTTP API, which is - also exposed through Swagger UI on `http://localhost:8080/openapi/`. -* **Orchestration layer** - the orchestration layer is responsible for coordinating the flow of operations between the - core services. It also takes care of transactions, making sure that changes made to specific entities are not visible - to the outside until the whole operation succeeded. There are 3 orchestrators, each responsible for actions related - to one of the 3 main entities - pipelines, connectors and processors. -* **Core** - we regard the core to be the combination of the entity management layer and the pipeline engine. It - provides functionality to the orchestrator layer and does not concern itself with where requests come from and how - single operations are combined into more complex flows. - * **Entity management** - this layer is concerned with the creation, editing, deletion and storage of the main - entities. You can think about this as a simple CRUD layer. It can be split up further using the main entities: - * **Pipeline** - this is the central entity managed by the Pipeline Service that ties together all other components. - A pipeline contains the configuration that defines how pipeline nodes should be connected together in a running - pipeline. It has references to at least one source and one destination connector and zero or multiple processors, - a pipeline that does not meet the criteria is regarded as incomplete and can't be started. A pipeline can be - either running, stopped or degraded (stopped because of an error). The pipeline can only be edited if it's not in - a running state. - * **Connector** - a connector takes care of receiving or forwarding records to connector plugins, depending on its - type (source or destination). It is also responsible for tracking the connector state as records flow through it. - The Connector Service manages the creation of connectors and permanently stores them in the Connector Store. A - connector can be configured to reference a number of processors, which will be executed only on records that are - received from or forwarded to that specific connector. - * **Connector Plugin** - interfaces with Conduit on one side, and with the standalone connector plugin on the - other and facilitates the communication between them. A standalone connector plugin is a separate process that - implements the interface defined in - [conduit-connector-protocol](https://github.com/ConduitIO/conduit-connector-protocol) and provides the read/write - functionality for a specific resource (e.g. a database). - * **Processor** - processors are stateless components that operate on a single record and can execute arbitrary - actions before forwarding the record to the next node in the pipeline. A processor can also choose to drop a - record without forwarding it. They can be attached either to a connector or to a pipeline, based on that they are - either processing only records that flow from/to a connector or all records that flow through a pipeline. - * **Processor Plugin** - interfaces with Conduit on one side, and with the standalone processor plugin on the - other and facilitates the communication between them. A standalone processor plugin is a WASM binary that - implements the interface defined in - [conduit-processor-sdk](https://github.com/ConduitIO/conduit-processor-sdk) and provides the logic for processing - a record (e.g. transforming its content). - * **Pipeline Engine** - the pipeline engine consists of nodes that can be connected together with Go channels to form - a data pipeline. - * **Node** - a node is a lightweight component that runs in its own goroutine and runs as long as the incoming channel - is open. As soon as the previous node stops forwarding records and closes its out channel, the current node also - stops running and closes its out channel. This continues down the pipeline until all records are drained and the - pipeline gracefully stops. In case a node experiences an error all other nodes will be notified and stop running - as soon as possible without draining the pipeline. -* **Persistence** - this layer is used directly by the orchestration layer and indirectly by the core layer (through - stores) to persist data. It provides the functionality of creating transactions and storing, retrieving and deleting - arbitrary data like configurations or state. +## API layer + +Exposes the public APIs used to communicate with Conduit. It exposes 2 types of APIs: + +### gRPC + +This is the main API provided by Conduit. The gRPC API definition can be found in [api.proto](https://github.com/ConduitIO/conduit/blob/main/proto/api/v1/api.proto), it can be used to generate code for the client. + +### HTTP + +The HTTP API is generated using [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway) and forwards the requests to the gRPC API. Conduit exposes an openapi definition that describes the HTTP API, which is also exposed through Swagger UI on `http://localhost:8080/openapi/`. + +## Orchestration layer + +The orchestration layer is responsible for coordinating the flow of operations between the core services. It also takes care of transactions, making sure that changes made to specific entities are not visible to the outside until the whole operation succeeded. There are five orchestrators, each responsible for actions related to one of the main entities: + +- Pipeline Orchestrator. +- Connector Orchestrator. +- Processor Orchestrator. +- Connector Plugin Orchestrator. +- Processor Plugin Orchestrator. + +## Core layer + +We regard the core to be the combination of the entity management layer and the pipeline engine. It provides functionality to the orchestrator layer and does not concern itself with where requests come from and how single operations are combined into more complex flows. + +### Entity management + +This layer is concerned with the creation, editing, deletion and storage of the main +entities. You can think about this as a simple CRUD layer. It can be split up further using the main entities: + +#### Pipeline + +This is the central entity managed by the Pipeline Service that ties together all other components. +A pipeline contains the configuration that defines how pipeline nodes should be connected together in a running +pipeline. It has references to at least one source and one destination connector and zero or multiple processors, +a pipeline that does not meet the criteria is regarded as incomplete and can't be started. A pipeline can be +either running, stopped or degraded (stopped because of an error). The pipeline can only be edited if it's not in +a running state. + +#### Connector + +A connector takes care of receiving or forwarding records to connector plugins, depending on its +type (source or destination). It is also responsible for tracking the connector state as records flow through it. +The Connector Service manages the creation of connectors and permanently stores them in the Connector Store. A +connector can be configured to reference a number of processors, which will be executed only on records that are +received from or forwarded to that specific connector. +* **Connector Plugin** - interfaces with Conduit on one side, and with the standalone connector plugin on the +other and facilitates the communication between them. A standalone connector plugin is a separate process that +implements the interface defined in +[conduit-connector-protocol](https://github.com/ConduitIO/conduit-connector-protocol) and provides the read/write +functionality for a specific resource (e.g. a database). + +#### Processor + +Processors are stateless components that operate on a single record and can execute arbitrary + actions before forwarding the record to the next node in the pipeline. A processor can also choose to drop a + record without forwarding it. They can be attached either to a connector or to a pipeline, based on that they are + either processing only records that flow from/to a connector or all records that flow through a pipeline. + * **Processor Plugin** - interfaces with Conduit on one side, and with the standalone processor plugin on the + other and facilitates the communication between them. A standalone processor plugin is a WASM binary that + implements the interface defined in + [conduit-processor-sdk](https://github.com/ConduitIO/conduit-processor-sdk) and provides the logic for processing + a record (e.g. transforming its content). + +### Pipeline Engine + +The pipeline engine consists of nodes that can be connected together with Go channels to form a data pipeline. + +#### Node + +A node is a lightweight component that runs in its own goroutine and runs as long as the incoming channel +is open. As soon as the previous node stops forwarding records and closes its out channel, the current node also +stops running and closes its out channel. This continues down the pipeline until all records are drained and the +pipeline gracefully stops. In case a node experiences an error all other nodes will be notified and stop running +as soon as possible without draining the pipeline. + +## Persistence layer + +This layer is used directly by the [Orchestration layer](#orchestration-layer) and indirectly by the [Core layer](#core-layer), and [Schema registry service](#schema-registry-service) (through stores) to persist data. It provides the functionality of creating transactions and storing, retrieving and deleting arbitrary data like configurations or state. + +More information on [storage](/docs/features/storage). + +## Connector utility services + +### Schema registry service + +The schema service is responsible for managing the schema of the records that flow through the pipeline. It provides functionality to infer a schema from a record. The schema is stored in the schema store and can be referenced by connectors and processors. By default, Conduit provides a built-in schema registry, but this service can be run separately from Conduit. + +More information on [Schema Registry](/docs/features/schema-support#schema-registry). ![scarf pixel conduit-site-docs-introduction](https://static.scarf.sh/a.png?x-pxid=01346572-0d57-4df3-8399-1425db913a0a) \ No newline at end of file diff --git a/docs/getting-started/installing-and-running.mdx b/docs/getting-started/installing-and-running.mdx index 08cf6ad5..1624302d 100644 --- a/docs/getting-started/installing-and-running.mdx +++ b/docs/getting-started/installing-and-running.mdx @@ -31,12 +31,12 @@ conduit First, download the [latest Conduit release](https://github.com/ConduitIO/conduit/releases/latest) for your platform. -For an ARM based back Mac for example we'd grab the `conduit_0.10.0_Darwin_arm64.tar.gz` archive. +For an ARM based back Mac for example we'd grab the `conduit_0.11.0_Darwin_arm64.tar.gz` archive. ### unzip the archive ```shell -tar xzvf conduit_0.10.0_Darwin_arm64.tar.gz +tar xzvf conduit_0.11.0_Darwin_arm64.tar.gz ``` ### Run @@ -78,12 +78,12 @@ If you're using Docker, check out [Watchtower](https://containrrr.dev/watchtower -Before you can build Conduit from source, you need to have Go 1.21 or higher installed. The Go website has [comprehensive instructions](https://go.dev/doc/install) on how to get the language installed on your machine. Once installed, you can follow the rest of the instructions. +Before you can build Conduit from source, you need to have Go 1.22 or higher installed. The Go website has [comprehensive instructions](https://go.dev/doc/install) on how to get the language installed on your machine. Once installed, you can follow the rest of the instructions. 1. Start by downloading the source code from the latest stable release on the Conduit [Releases Page](https://github.com/ConduitIO/conduit/releases/latest). Alternatively, you can run this command to automatically download the latest stable source to your current directory: ```shell -$ TAG=v0.10.0 curl -o conduit.tar.gz -L https://github.com/ConduitIO/conduit/archive/refs/tags/$TAG.tar.gz +$ TAG=v0.11.0; curl -o conduit.tar.gz -L https://github.com/ConduitIO/conduit/archive/refs/tags/$TAG.tar.gz ``` A file called `conduit.tgz` will be in your current directory. The next step is to expand the source: @@ -95,7 +95,7 @@ $ tar zxvf conduit.tgz Then change directories to the appropriate folder. Keep in mind that the folder name might be different between releases since it's tied to the latest git sha for the commit. ```shell -$ cd conduit-0.10.0 +$ cd conduit-0.11.0 ``` Now build the project: @@ -104,7 +104,7 @@ Now build the project: $ make ``` -If you want to build conduit without the UI, instead of running `make`, you can run the following command: +If you want to build Conduit without the UI, instead of running `make`, you can run the following command: ```shell $ make build-server @@ -129,7 +129,7 @@ You should now be able to interact with the Conduit UI and HTTP API on port 8080 `:::::::: ::::::::‘ `:::: ::::‘ `:::::....:::::‘ - `::::::::::‘ Conduit v0.10.0 linux/amd64 + `::::::::::‘ Conduit v0.11.0 linux/amd64 ‘‘‘‘ 2024-02-20T21:37:45+00:00 INF All 0 tables opened in 0s component=badger.DB 2024-02-20T21:37:45+00:00 INF Discard stats nextEmptySlot: 0 component=badger.DB diff --git a/docs/introduction.mdx b/docs/introduction.mdx index 2e262b0f..dbd09d72 100644 --- a/docs/introduction.mdx +++ b/docs/introduction.mdx @@ -73,7 +73,7 @@ Now that we have Conduit installed let's start it up to see what happens. `:::::::: ::::::::‘ `:::: ::::‘ `:::::....:::::‘ - `::::::::::‘ Conduit v0.10.0 darwin/arm64 + `::::::::::‘ Conduit v0.11.0 darwin/arm64 ‘‘‘‘ 2024-02-21T14:41:26+00:00 INF All 0 tables opened in 0s component=badger.DB 2024-02-21T14:41:26+00:00 INF Discard stats nextEmptySlot: 0 component=badger.DB @@ -115,7 +115,7 @@ While you can provision pipelines via Conduit's UI, the recommended way to do so For this example we'll create a pipeline that will move data from one file to another. -First we'll create the `pipelines` directory in the same directory as our conduit binary. +First we'll create the `pipelines` directory in the same directory as our Conduit binary. ```shell mkdir pipelines ``` @@ -160,7 +160,7 @@ Conduit should start and we should see references to our new pipeline in the out `:::::::: ::::::::‘ `:::: ::::‘ `:::::....:::::‘ - `::::::::::‘ Conduit v0.10.0 darwin/arm64 + `::::::::::‘ Conduit v0.11.0 darwin/arm64 ‘‘‘‘ 2024-02-21T16:50:35+00:00 INF All 0 tables opened in 0s component=badger.DB 2024-02-21T16:50:35+00:00 INF Discard stats nextEmptySlot: 0 component=badger.DB diff --git a/docs/processors/builtin/avro.decode.mdx b/docs/processors/builtin/avro.decode.mdx index c8c029ee..2af83125 100644 --- a/docs/processors/builtin/avro.decode.mdx +++ b/docs/processors/builtin/avro.decode.mdx @@ -41,40 +41,27 @@ pipelines: - id: example plugin: "avro.decode" settings: - # The password to use with basic authentication. This option is - # required if auth.basic.username contains a value. If both - # auth.basic.username and auth.basic.password are empty basic - # authentication is disabled. - # Type: string - auth.basic.password: "" - # The username to use with basic authentication. This option is - # required if auth.basic.password contains a value. If both - # auth.basic.username and auth.basic.password are empty basic - # authentication is disabled. - # Type: string - auth.basic.username: "" # The field that will be decoded. # For more information about the format, see [Referencing # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: ".Payload.After" - # The path to a file containing PEM encoded CA certificates. If this - # option is empty, Conduit falls back to using the host's root CA set. - # Type: string - tls.ca.cert: "" - # The path to a file containing a PEM encoded certificate. This option - # is required if tls.client.key contains a value. If both - # tls.client.cert and tls.client.key are empty TLS is disabled. - # Type: string - tls.client.cert: "" - # The path to a file containing a PEM encoded private key. This option - # is required if tls.client.cert contains a value. If both - # tls.client.cert and tls.client.key are empty TLS is disabled. - # Type: string - tls.client.key: "" - # URL of the schema registry (e.g. http://localhost:8085) - # Type: string - url: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -85,26 +72,6 @@ pipelines: Default Description - - `auth.basic.password` - string - - - The password to use with basic authentication. This option is required if -auth.basic.username contains a value. If both auth.basic.username and auth.basic.password -are empty basic authentication is disabled. - - - - `auth.basic.username` - string - - - The username to use with basic authentication. This option is required if -auth.basic.password contains a value. If both auth.basic.username and auth.basic.password -are empty basic authentication is disabled. - - `field` string @@ -116,40 +83,35 @@ For more information about the format, see [Referencing fields](https://conduit. - `tls.ca.cert` - string - + `sdk.schema.decode.key.enabled` + bool + `true` - The path to a file containing PEM encoded CA certificates. If this option is empty, -Conduit falls back to using the host's root CA set. + Whether to decode the record key using its corresponding schema from the schema registry. - `tls.client.cert` - string - + `sdk.schema.decode.payload.enabled` + bool + `true` - The path to a file containing a PEM encoded certificate. This option is required -if tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty -TLS is disabled. + Whether to decode the record payload using its corresponding schema from the schema registry. - `tls.client.key` - string - + `sdk.schema.encode.key.enabled` + bool + `true` - The path to a file containing a PEM encoded private key. This option is required -if tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty -TLS is disabled. + Whether to encode the record key using its corresponding schema from the schema registry. - `url` - string - + `sdk.schema.encode.payload.enabled` + bool + `true` - URL of the schema registry (e.g. http://localhost:8085) + Whether to encode the record payload using its corresponding schema from the schema registry. @@ -192,7 +154,6 @@ pipelines: plugin: "avro.decode" settings: field: ".Key" - url: "http://127.0.0.1:54322" ``` @@ -205,10 +166,6 @@ pipelines: `field` `.Key` - - `url` - `http://127.0.0.1:54322` - diff --git a/docs/processors/builtin/avro.encode.mdx b/docs/processors/builtin/avro.encode.mdx index f9cd451f..fa8f6eb2 100644 --- a/docs/processors/builtin/avro.encode.mdx +++ b/docs/processors/builtin/avro.encode.mdx @@ -54,18 +54,6 @@ pipelines: - id: example plugin: "avro.encode" settings: - # The password to use with basic authentication. This option is - # required if auth.basic.username contains a value. If both - # auth.basic.username and auth.basic.password are empty basic - # authentication is disabled. - # Type: string - auth.basic.password: "" - # The username to use with basic authentication. This option is - # required if auth.basic.password contains a value. If both - # auth.basic.username and auth.basic.password are empty basic - # authentication is disabled. - # Type: string - auth.basic.username: "" # The field that will be encoded. # For more information about the format, see [Referencing # fields](https://conduit.io/docs/processors/referencing-fields). @@ -95,23 +83,22 @@ pipelines: # main processor description. # Type: string schema.strategy: "" - # The path to a file containing PEM encoded CA certificates. If this - # option is empty, Conduit falls back to using the host's root CA set. - # Type: string - tls.ca.cert: "" - # The path to a file containing a PEM encoded certificate. This option - # is required if tls.client.key contains a value. If both - # tls.client.cert and tls.client.key are empty TLS is disabled. - # Type: string - tls.client.cert: "" - # The path to a file containing a PEM encoded private key. This option - # is required if tls.client.cert contains a value. If both - # tls.client.cert and tls.client.key are empty TLS is disabled. - # Type: string - tls.client.key: "" - # URL of the schema registry (e.g. http://localhost:8085) - # Type: string - url: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -122,26 +109,6 @@ pipelines: Default Description - - `auth.basic.password` - string - - - The password to use with basic authentication. This option is required if -auth.basic.username contains a value. If both auth.basic.username and auth.basic.password -are empty basic authentication is disabled. - - - - `auth.basic.username` - string - - - The username to use with basic authentication. This option is required if -auth.basic.password contains a value. If both auth.basic.username and auth.basic.password -are empty basic authentication is disabled. - - `field` string @@ -193,40 +160,35 @@ For more information about the behavior of each strategy read the main processor - `tls.ca.cert` - string - + `sdk.schema.decode.key.enabled` + bool + `true` - The path to a file containing PEM encoded CA certificates. If this option is empty, -Conduit falls back to using the host's root CA set. + Whether to decode the record key using its corresponding schema from the schema registry. - `tls.client.cert` - string - + `sdk.schema.decode.payload.enabled` + bool + `true` - The path to a file containing a PEM encoded certificate. This option is required -if tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty -TLS is disabled. + Whether to decode the record payload using its corresponding schema from the schema registry. - `tls.client.key` - string - + `sdk.schema.encode.key.enabled` + bool + `true` - The path to a file containing a PEM encoded private key. This option is required -if tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty -TLS is disabled. + Whether to encode the record key using its corresponding schema from the schema registry. - `url` - string - + `sdk.schema.encode.payload.enabled` + bool + `true` - URL of the schema registry (e.g. http://localhost:8085) + Whether to encode the record payload using its corresponding schema from the schema registry. @@ -260,7 +222,6 @@ pipelines: field: ".Payload.After" schema.autoRegister.subject: "example-autoRegister" schema.strategy: "autoRegister" - url: "http://127.0.0.1:54322" ``` @@ -281,10 +242,6 @@ pipelines: `schema.strategy` `autoRegister` - - `url` - `http://127.0.0.1:54322` - @@ -351,7 +308,6 @@ pipelines: schema.preRegistered.subject: "example-preRegistered" schema.preRegistered.version: "1" schema.strategy: "preRegistered" - url: "http://127.0.0.1:54322" ``` @@ -376,10 +332,6 @@ pipelines: `schema.strategy` `preRegistered` - - `url` - `http://127.0.0.1:54322` - diff --git a/docs/processors/builtin/base64.decode.mdx b/docs/processors/builtin/base64.decode.mdx index bcbee6f0..8b53284d 100644 --- a/docs/processors/builtin/base64.decode.mdx +++ b/docs/processors/builtin/base64.decode.mdx @@ -41,6 +41,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -62,6 +78,38 @@ base64 decode the `.Position` field. For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/base64.encode.mdx b/docs/processors/builtin/base64.encode.mdx index 9de5edef..b8626189 100644 --- a/docs/processors/builtin/base64.encode.mdx +++ b/docs/processors/builtin/base64.encode.mdx @@ -43,6 +43,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -64,6 +80,38 @@ base64 encode the `.Position` field. For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/custom.javascript.mdx b/docs/processors/builtin/custom.javascript.mdx index 5e07d23e..8ec3e5e0 100644 --- a/docs/processors/builtin/custom.javascript.mdx +++ b/docs/processors/builtin/custom.javascript.mdx @@ -56,6 +56,22 @@ pipelines: # The path to a .js file containing the processor code. # Type: string script.path: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -88,6 +104,38 @@ then the `process()` function should return `null`. The path to a .js file containing the processor code. + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/error.mdx b/docs/processors/builtin/error.mdx index e8f0370a..8e10c3a6 100644 --- a/docs/processors/builtin/error.mdx +++ b/docs/processors/builtin/error.mdx @@ -44,6 +44,22 @@ pipelines: # being processed. # Type: string message: "error processor triggered" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "false" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "false" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "false" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "false" ``` @@ -64,6 +80,38 @@ executed on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-comm being processed. + + `sdk.schema.decode.key.enabled` + bool + `false` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `false` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `false` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `false` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/field.convert.mdx b/docs/processors/builtin/field.convert.mdx index 4647f1b2..e823473f 100644 --- a/docs/processors/builtin/field.convert.mdx +++ b/docs/processors/builtin/field.convert.mdx @@ -46,8 +46,24 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" # Type is the target field type after conversion, available options - # are: string, int, float, bool. + # are: `string`, `int`, `float`, `bool`, `time`. # Type: string type: "" ``` @@ -72,12 +88,44 @@ Note that you can only convert fields in structured data under `.Key` and For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + `type` string - Type is the target field type after conversion, available options are: string, int, float, bool. + Type is the target field type after conversion, available options are: `string`, `int`, `float`, `bool`, `time`. @@ -86,6 +134,71 @@ For more information about the format, see [Referencing fields](https://conduit. ## Examples +### Convert `int` to `time` + +This example takes an `int` in field `.Payload.After.createdAt` and parses it as a unix timestamp into a `time.Time` value. + +#### Configuration parameters + + + +```yaml +version: 2.2 +pipelines: + - id: example + status: running + connectors: + # define source and destination ... + processors: + - id: example + plugin: "field.convert" + settings: + field: ".Payload.After.createdAt" + type: "time" +``` + + + + + + + + + + + + + + + +
NameValue
`field``.Payload.After.createdAt`
`type``time`
+
+
+ +#### Record difference + +```mdx-code-block + + + +``` + +--- + ### Convert `float` to `string` This example takes the `float` in field `.Key.id` and changes its data type to `string`. diff --git a/docs/processors/builtin/field.exclude.mdx b/docs/processors/builtin/field.exclude.mdx index c64b002e..e6ad4ca5 100644 --- a/docs/processors/builtin/field.exclude.mdx +++ b/docs/processors/builtin/field.exclude.mdx @@ -46,6 +46,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string fields: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -66,6 +82,38 @@ pipelines: For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/field.rename.mdx b/docs/processors/builtin/field.rename.mdx index 3a55b96a..fedbf4e4 100644 --- a/docs/processors/builtin/field.rename.mdx +++ b/docs/processors/builtin/field.rename.mdx @@ -47,6 +47,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string mapping: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -70,6 +86,38 @@ For example: `.Metadata.key:id,.Payload.After.foo:bar`. For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/field.set.mdx b/docs/processors/builtin/field.set.mdx index 80520cab..0bec6388 100644 --- a/docs/processors/builtin/field.set.mdx +++ b/docs/processors/builtin/field.set.mdx @@ -45,6 +45,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" # Value is a Go template expression which will be evaluated and stored # in `field` (e.g. `{{ .Payload.After }}`). # Type: string @@ -70,6 +86,38 @@ to set the `.Position` field. For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + `value` string diff --git a/docs/processors/builtin/filter.mdx b/docs/processors/builtin/filter.mdx index 45ae1501..dbc27088 100644 --- a/docs/processors/builtin/filter.mdx +++ b/docs/processors/builtin/filter.mdx @@ -38,11 +38,66 @@ pipelines: processors: - id: example plugin: "filter" - + settings: + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "false" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "false" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "false" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "false" ``` - No configuration parameters. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
NameTypeDefaultDescription
`sdk.schema.decode.key.enabled`bool`false` + Whether to decode the record key using its corresponding schema from the schema registry. +
`sdk.schema.decode.payload.enabled`bool`false` + Whether to decode the record payload using its corresponding schema from the schema registry. +
`sdk.schema.encode.key.enabled`bool`false` + Whether to encode the record key using its corresponding schema from the schema registry. +
`sdk.schema.encode.payload.enabled`bool`false` + Whether to encode the record payload using its corresponding schema from the schema registry. +
diff --git a/docs/processors/builtin/json.decode.mdx b/docs/processors/builtin/json.decode.mdx index f2b09b4b..6f08696c 100644 --- a/docs/processors/builtin/json.decode.mdx +++ b/docs/processors/builtin/json.decode.mdx @@ -45,6 +45,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -66,6 +82,38 @@ pipelines: For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/json.encode.mdx b/docs/processors/builtin/json.encode.mdx index 923674dd..eb991b34 100644 --- a/docs/processors/builtin/json.encode.mdx +++ b/docs/processors/builtin/json.encode.mdx @@ -44,6 +44,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -65,6 +81,38 @@ pipelines: For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/unwrap.debezium.mdx b/docs/processors/builtin/unwrap.debezium.mdx index 0c879011..c775ecd3 100644 --- a/docs/processors/builtin/unwrap.debezium.mdx +++ b/docs/processors/builtin/unwrap.debezium.mdx @@ -45,6 +45,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: ".Payload.After" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -65,6 +81,38 @@ pipelines: For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/unwrap.kafkaconnect.mdx b/docs/processors/builtin/unwrap.kafkaconnect.mdx index 499fd5eb..784b437b 100644 --- a/docs/processors/builtin/unwrap.kafkaconnect.mdx +++ b/docs/processors/builtin/unwrap.kafkaconnect.mdx @@ -45,6 +45,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: ".Payload.After" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -65,6 +81,38 @@ pipelines: For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/unwrap.opencdc.mdx b/docs/processors/builtin/unwrap.opencdc.mdx index 9ea4e3f1..b9ef6a08 100644 --- a/docs/processors/builtin/unwrap.opencdc.mdx +++ b/docs/processors/builtin/unwrap.opencdc.mdx @@ -43,6 +43,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string field: ".Payload.After" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -63,6 +79,38 @@ pipelines: For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/builtin/webhook.http.mdx b/docs/processors/builtin/webhook.http.mdx index dad7818c..d9f3bd8e 100644 --- a/docs/processors/builtin/webhook.http.mdx +++ b/docs/processors/builtin/webhook.http.mdx @@ -90,6 +90,22 @@ pipelines: # fields](https://conduit.io/docs/processors/referencing-fields). # Type: string response.status: "" + # Whether to decode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.decode.key.enabled: "true" + # Whether to decode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.decode.payload.enabled: "true" + # Whether to encode the record key using its corresponding schema from + # the schema registry. + # Type: bool + sdk.schema.encode.key.enabled: "true" + # Whether to encode the record payload using its corresponding schema + # from the schema registry. + # Type: bool + sdk.schema.encode.payload.enabled: "true" ``` @@ -201,6 +217,38 @@ is set, then the response status will NOT be saved. For more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields). + + `sdk.schema.decode.key.enabled` + bool + `true` + + Whether to decode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.decode.payload.enabled` + bool + `true` + + Whether to decode the record payload using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.key.enabled` + bool + `true` + + Whether to encode the record key using its corresponding schema from the schema registry. + + + + `sdk.schema.encode.payload.enabled` + bool + `true` + + Whether to encode the record payload using its corresponding schema from the schema registry. + + diff --git a/docs/processors/standalone/building.mdx b/docs/processors/standalone/building.mdx index d0b0246f..7fbb915b 100644 --- a/docs/processors/standalone/building.mdx +++ b/docs/processors/standalone/building.mdx @@ -289,6 +289,127 @@ func main() { Check [Compiling the processor](#compiling-the-processor) for what to do next, and how to compile the processor. +## Schemas + +Processors have access to the schemas in the used Schema Registry. By default, if the pipeline uses a schema +registry and the processor gets a record with the schema info in the `Metadata`, then the processor will have +a middleware enabled. The middleware will decode the records before they are passed to the processor +using their corresponding schema from the schema registry, and encode them again after the processing is done. To +change this default behaviour, you can change these processor's configurations accordingly: +- `sdk.schema.decode.key.enabled`: Whether to decode the record key using its corresponding schema from the schema registry. +- `sdk.schema.decode.payload.enabled`: Whether to decode the record payload using its corresponding schema from the schema registry. +- `sdk.schema.encode.key.enabled`: Whether to encode the record key using its corresponding schema from the schema registry. +- `sdk.schema.encode.payload.enabled`: Whether to encode the record payload using its corresponding schema from the schema registry. + +**Example** of a pipeline configuration file with these parameters: +```yaml +version: 2.2 +pipelines: + - id: test + status: running + connectors: + - id: employees-source + type: source + plugin: standalone:generator + settings: + rate: 1 + collections.str.format.type: structured + collections.str.format.options.id: int + collections.str.format.options.name: string + collections.str.format.options.admin: bool + collections.str.operations: create + - id: logger-dest + type: destination + plugin: standalone:log + processors: + - id: access-schema + plugin: standalone:processor-simple + sdk.schema.decode.key.enabled: false # disabling the default behaviour + sdk.schema.encode.key.enabled: false # disabling the default behaviour +``` + +Processors can access the Schema Registry and the schemas using two utility functions: +1. `schema.Create` : You can use this utility function to create a new schema and add it to the Schema Registry. +This function can be called in any of the main processor methods. + +**Example**: +```go +func (p *exampleProcessor) Open(ctx context.Context) error { + // Add a new schema to the schema registry before starting to process the records. + schemaBytes := []byte(`{ + "name": "record", + "type": "record", + "fields": [ + { + "name": "admin", + "type": "boolean" + }, + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + } + ] + }`) + _, err := schema.Create(ctx, schema.TypeAvro, "subject1", schemaBytes) + return err +} +``` + +2. `schema.Get`: You can use this utility function to get a schema from the Schema Registry using + its `version` and `subject`. This function can be called in any of the main processor methods. + + **Example**: + + ```go + func (p *exampleProcessor) Process(ctx context.Context, records []opencdc.Record) []sdk.ProcessedRecord { + out := make([]sdk.ProcessedRecord, 0, len(records)) + for _, record := range records { + // get the schema subject name from the metadata + subject, err := rec.Metadata.GetPayloadSchemaSubject() + if err != nil { + return append(out, sdk.ErrorRecord{Error: err}) + } + // get the schema version from the metadata + version, err := rec.Metadata.GetPayloadSchemaVersion() + if err != nil { + return append(out, sdk.ErrorRecord{Error: err}) + } + // get the schema using the subject and the version + get, err := schema.Get(ctx, subject, version) + // print the schema + fmt.Println(string(get.Bytes)) + } + return out + } + ``` + + Example output (the printed schema): + +```json +{ + "name": "record", + "type": "record", + "fields": [ + { + "name": "admin", + "type": "boolean" + }, + { + "name": "id", + "type": "int" + }, + { + "name": "name", + "type": "string" + } + ] +} +``` + ## Logging You can get a `zerolog.logger` instance from the context using the [`sdk.Logger`](https://pkg.go.dev/github.com/conduitio/conduit-processor-sdk#Logger) diff --git a/docs/processors/standalone/how-it-works.mdx b/docs/processors/standalone/how-it-works.mdx index 6a4b05c4..e4057b07 100644 --- a/docs/processors/standalone/how-it-works.mdx +++ b/docs/processors/standalone/how-it-works.mdx @@ -183,6 +183,28 @@ The memory address sent to `command_response` should contain a - [`Error`](https://buf.build/conduitio/conduit-processor-sdk/docs/main:processor.v1#processor.v1.Error) should be sent in response to any command request that failed to be processed. +### `create_schema` + +The processor should call this function to create a new Schema in the schema registry. +The memory address sent to `create_schema` should contain a [`CreateSchemaRequest`](https://buf.build/conduitio/conduit-processor-sdk/docs/main:procutils.v1#procutils.v1.CreateSchemaRequest), +then Conduit will populate the memory buffer with a [`CreateSchemaResponse`](https://buf.build/conduitio/conduit-processor-sdk/docs/main:procutils.v1#procutils.v1.CreateSchemaResponse). + +The returned integer represents the size of the response if the request was successful. +If the allocated memory buffer is smaller than the returned response size, the processor +should reallocate a larger buffer and call `create_schema` again. +If the schema creation failed, an [error code](#error-codes) will be returned instead. + +### `get_schema` + +The processor should call this function to get a Schema from the schema registry. +The memory address sent to `get_schema` should contain a [`GetSchemaRequest`](https://buf.build/conduitio/conduit-processor-sdk/docs/main:procutils.v1#procutils.v1.GetSchemaRequest), +then Conduit will populate the memory buffer with a [`GetSchemaResponse`](https://buf.build/conduitio/conduit-processor-sdk/docs/main:procutils.v1#procutils.v1.GetSchemaResponse). + +The returned integer represents the size of the response if the request was successful. +If the allocated memory buffer is smaller than the returned response size, the processor +should reallocate a larger buffer and call `get_schema` again. +If getting the schema fails, an [error code](#error-codes) will be returned instead. + ## Error codes The last 100 numbers at the end of an `uin32` (between 4,294,967,195 and @@ -198,11 +220,20 @@ running. - `4,294,967,293` - unknown command request. Internal error in Conduit, where the command request failed to be marshaled into a proto message. - `4,294,967,292` - unknown command response. Internal error in Conduit, where - the command response failed to be unmarshaled from the proto message into a + the command response failed to be unmarshalled from the proto message into a struct. - `4,294,967,291` - memory out of range. Internal error in Conduit, where it tried to write into the allocated memory, but failed to do so, as the size was insufficient. +- `4,294,967,290` - Internal error in Conduit, could be an error while + marshalling/unmarshalling the buffer, or other reasons, the specific + error should be logged by Conduit. +- `4,294,967,289` - Subject not found, an internal Conduit error where + the Schema with this subject does not exist. +- `4,294,967,288` - Version not found, an internal Conduit error where + the Schema with this Version does not exist. +- `4,294,967,287` - Invalid schema, an internal Conduit error where + the Schema provided is invalid. ## Logging diff --git a/docusaurus.config.ts b/docusaurus.config.ts index bd7a0b67..ddd1ce11 100644 --- a/docusaurus.config.ts +++ b/docusaurus.config.ts @@ -164,8 +164,8 @@ const config: Config = { copyright: `Copyright © ${new Date().getFullYear()} Meroxa, Inc.`, }, announcementBar: { - id: 'announcement-bar-3', // increment on change - content: `Conduit 0.10.0 is here! See what's new.`, + id: 'announcement-bar-4', // increment on change + content: `Conduit 0.11.0 is here! See what's new.`, isCloseable: true, }, colorMode: { @@ -214,6 +214,10 @@ const config: Config = { { from: '/docs/introduction/architecture', to: '/docs/getting-started/architecture' + }, + { + from: '/docs/connectors/output-formats', + to: '/docs/connectors/configuration-parameters/output-format' } ], createRedirects(existingPath) { diff --git a/src/processorgen/specs/avro.decode.json b/src/processorgen/specs/avro.decode.json index 5c0c8883..808375cc 100644 --- a/src/processorgen/specs/avro.decode.json +++ b/src/processorgen/specs/avro.decode.json @@ -6,52 +6,35 @@ "version": "v0.1.0", "author": "Meroxa, Inc.", "parameters": { - "auth.basic.password": { - "default": "", - "description": "The password to use with basic authentication. This option is required if\nauth.basic.username contains a value. If both auth.basic.username and auth.basic.password\nare empty basic authentication is disabled.", - "type": "string", - "validations": [] - }, - "auth.basic.username": { - "default": "", - "description": "The username to use with basic authentication. This option is required if\nauth.basic.password contains a value. If both auth.basic.username and auth.basic.password\nare empty basic authentication is disabled.", - "type": "string", - "validations": [] - }, "field": { "default": ".Payload.After", "description": "The field that will be decoded.\n\nFor more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).", "type": "string", "validations": [] }, - "tls.ca.cert": { - "default": "", - "description": "The path to a file containing PEM encoded CA certificates. If this option is empty,\nConduit falls back to using the host's root CA set.", - "type": "string", - "validations": [] + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null }, - "tls.client.cert": { - "default": "", - "description": "The path to a file containing a PEM encoded certificate. This option is required\nif tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty\nTLS is disabled.", - "type": "string", - "validations": [] + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null }, - "tls.client.key": { - "default": "", - "description": "The path to a file containing a PEM encoded private key. This option is required\nif tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty\nTLS is disabled.", - "type": "string", - "validations": [] + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null }, - "url": { - "default": "", - "description": "URL of the schema registry (e.g. http://localhost:8085)", - "type": "string", - "validations": [ - { - "type": "required", - "value": "" - } - ] + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, @@ -60,8 +43,7 @@ "summary": "Decode a record field in Avro format", "description": "This example shows the usage of the `avro.decode` processor.\nThe processor decodes the record's`.Key` field using the schema that is\ndownloaded from the schema registry and needs to exist under the subject`example-decode`.\nIn this example we use the following schema:\n\n```json\n{\n \"type\":\"record\",\n \"name\":\"record\",\n \"fields\":[\n {\"name\":\"myString\",\"type\":\"string\"},\n {\"name\":\"myInt\",\"type\":\"int\"}\n ]\n}\n```", "config": { - "field": ".Key", - "url": "http://127.0.0.1:54322" + "field": ".Key" }, "have": { "position": "dGVzdC1wb3NpdGlvbg==", diff --git a/src/processorgen/specs/avro.encode.json b/src/processorgen/specs/avro.encode.json index 3f909141..17d00f6c 100644 --- a/src/processorgen/specs/avro.encode.json +++ b/src/processorgen/specs/avro.encode.json @@ -6,18 +6,6 @@ "version": "v0.1.0", "author": "Meroxa, Inc.", "parameters": { - "auth.basic.password": { - "default": "", - "description": "The password to use with basic authentication. This option is required if\nauth.basic.username contains a value. If both auth.basic.username and auth.basic.password\nare empty basic authentication is disabled.", - "type": "string", - "validations": [] - }, - "auth.basic.username": { - "default": "", - "description": "The username to use with basic authentication. This option is required if\nauth.basic.password contains a value. If both auth.basic.username and auth.basic.password\nare empty basic authentication is disabled.", - "type": "string", - "validations": [] - }, "field": { "default": ".Payload.After", "description": "The field that will be encoded.\n\nFor more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).", @@ -62,34 +50,29 @@ } ] }, - "tls.ca.cert": { - "default": "", - "description": "The path to a file containing PEM encoded CA certificates. If this option is empty,\nConduit falls back to using the host's root CA set.", - "type": "string", - "validations": [] + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null }, - "tls.client.cert": { - "default": "", - "description": "The path to a file containing a PEM encoded certificate. This option is required\nif tls.client.key contains a value. If both tls.client.cert and tls.client.key are empty\nTLS is disabled.", - "type": "string", - "validations": [] + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null }, - "tls.client.key": { - "default": "", - "description": "The path to a file containing a PEM encoded private key. This option is required\nif tls.client.cert contains a value. If both tls.client.cert and tls.client.key are empty\nTLS is disabled.", - "type": "string", - "validations": [] + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null }, - "url": { - "default": "", - "description": "URL of the schema registry (e.g. http://localhost:8085)", - "type": "string", - "validations": [ - { - "type": "required", - "value": "" - } - ] + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, @@ -100,8 +83,7 @@ "config": { "field": ".Payload.After", "schema.autoRegister.subject": "example-autoRegister", - "schema.strategy": "autoRegister", - "url": "http://127.0.0.1:54322" + "schema.strategy": "autoRegister" }, "have": { "position": "dGVzdC1wb3NpdGlvbg==", @@ -147,8 +129,7 @@ "field": ".Key", "schema.preRegistered.subject": "example-preRegistered", "schema.preRegistered.version": "1", - "schema.strategy": "preRegistered", - "url": "http://127.0.0.1:54322" + "schema.strategy": "preRegistered" }, "have": { "position": "dGVzdC1wb3NpdGlvbg==", diff --git a/src/processorgen/specs/base64.decode.json b/src/processorgen/specs/base64.decode.json index eeaee6e5..1d3db270 100644 --- a/src/processorgen/specs/base64.decode.json +++ b/src/processorgen/specs/base64.decode.json @@ -20,6 +20,30 @@ "value": ".Position" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/base64.encode.json b/src/processorgen/specs/base64.encode.json index e08a4726..ccc590f5 100644 --- a/src/processorgen/specs/base64.encode.json +++ b/src/processorgen/specs/base64.encode.json @@ -20,6 +20,30 @@ "value": ".Position" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/custom.javascript.json b/src/processorgen/specs/custom.javascript.json index 099a141e..dec8c6ef 100644 --- a/src/processorgen/specs/custom.javascript.json +++ b/src/processorgen/specs/custom.javascript.json @@ -17,6 +17,30 @@ "description": "The path to a .js file containing the processor code.", "type": "string", "validations": [] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/error.json b/src/processorgen/specs/error.json index 17909513..9fcab1a9 100644 --- a/src/processorgen/specs/error.json +++ b/src/processorgen/specs/error.json @@ -11,6 +11,30 @@ "description": "Error message to be returned. This can be a Go [template](https://pkg.go.dev/text/template)\nexecuted on each [`Record`](https://pkg.go.dev/github.com/conduitio/conduit-commons/opencdc#Record)\nbeing processed.", "type": "string", "validations": [] + }, + "sdk.schema.decode.key.enabled": { + "default": "false", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "false", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "false", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "false", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/field.convert.json b/src/processorgen/specs/field.convert.json index c7657a46..ed2befce 100644 --- a/src/processorgen/specs/field.convert.json +++ b/src/processorgen/specs/field.convert.json @@ -21,9 +21,33 @@ } ] }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, "type": { "default": "", - "description": "Type is the target field type after conversion, available options are: string, int, float, bool.", + "description": "Type is the target field type after conversion, available options are: `string`, `int`, `float`, `bool`, `time`.", "type": "string", "validations": [ { @@ -32,13 +56,49 @@ }, { "type": "inclusion", - "value": "string,int,float,bool" + "value": "string,int,float,bool,time" } ] } } }, "examples": [ + { + "summary": "Convert `int` to `time`", + "description": "This example takes an `int` in field `.Payload.After.createdAt` and parses it as a unix timestamp into a `time.Time` value.", + "config": { + "field": ".Payload.After.createdAt", + "type": "time" + }, + "have": { + "position": null, + "operation": "create", + "metadata": null, + "key": { + "id": 123.345 + }, + "payload": { + "before": null, + "after": { + "createdAt": 1704198896123456789 + } + } + }, + "want": { + "position": null, + "operation": "create", + "metadata": null, + "key": { + "id": 123.345 + }, + "payload": { + "before": null, + "after": { + "createdAt": "2024-01-02T12:34:56.123456789Z" + } + } + } + }, { "summary": "Convert `float` to `string`", "description": "This example takes the `float` in field `.Key.id` and changes its data type to `string`.", diff --git a/src/processorgen/specs/field.exclude.json b/src/processorgen/specs/field.exclude.json index 1fdb6504..a604b6b6 100644 --- a/src/processorgen/specs/field.exclude.json +++ b/src/processorgen/specs/field.exclude.json @@ -16,6 +16,30 @@ "value": "" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/field.rename.json b/src/processorgen/specs/field.rename.json index de70b146..c6f28cc5 100644 --- a/src/processorgen/specs/field.rename.json +++ b/src/processorgen/specs/field.rename.json @@ -16,6 +16,30 @@ "value": "" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/field.set.json b/src/processorgen/specs/field.set.json index 53f3e067..eed55f2d 100644 --- a/src/processorgen/specs/field.set.json +++ b/src/processorgen/specs/field.set.json @@ -21,6 +21,30 @@ } ] }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, "value": { "default": "", "description": "Value is a Go template expression which will be evaluated and stored in `field` (e.g. `{{ .Payload.After }}`).", diff --git a/src/processorgen/specs/filter.json b/src/processorgen/specs/filter.json index f3db98b1..72f9e179 100644 --- a/src/processorgen/specs/filter.json +++ b/src/processorgen/specs/filter.json @@ -5,7 +5,32 @@ "description": "Acknowledges all records that get passed to the filter, so\nthe records will be filtered out if the condition provided to the processor is\nevaluated to `true`.\n\n**Important:** Make sure to add a [condition](https://conduit.io/docs/processors/conditions)\nto this processor, otherwise all records will be filtered out.", "version": "v0.1.0", "author": "Meroxa, Inc.", - "parameters": {} + "parameters": { + "sdk.schema.decode.key.enabled": { + "default": "false", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "false", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "false", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "false", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + } + } }, "examples": [ { diff --git a/src/processorgen/specs/json.decode.json b/src/processorgen/specs/json.decode.json index 995e2db4..36f1bce3 100644 --- a/src/processorgen/specs/json.decode.json +++ b/src/processorgen/specs/json.decode.json @@ -24,6 +24,30 @@ "value": ".Payload" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/json.encode.json b/src/processorgen/specs/json.encode.json index 5ec4a94d..a2705a37 100644 --- a/src/processorgen/specs/json.encode.json +++ b/src/processorgen/specs/json.encode.json @@ -24,6 +24,30 @@ "value": ".Payload" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/unwrap.debezium.json b/src/processorgen/specs/unwrap.debezium.json index 85da2cbd..331c7591 100644 --- a/src/processorgen/specs/unwrap.debezium.json +++ b/src/processorgen/specs/unwrap.debezium.json @@ -16,6 +16,30 @@ "value": "^.Payload" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/unwrap.kafkaconnect.json b/src/processorgen/specs/unwrap.kafkaconnect.json index 5dbc09ef..87afb870 100644 --- a/src/processorgen/specs/unwrap.kafkaconnect.json +++ b/src/processorgen/specs/unwrap.kafkaconnect.json @@ -16,6 +16,30 @@ "value": "^.Payload" } ] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/unwrap.opencdc.json b/src/processorgen/specs/unwrap.opencdc.json index d9c39d9f..ee63ffe2 100644 --- a/src/processorgen/specs/unwrap.opencdc.json +++ b/src/processorgen/specs/unwrap.opencdc.json @@ -11,6 +11,30 @@ "description": "Field is a reference to the field that contains the OpenCDC record.\n\nFor more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).", "type": "string", "validations": [] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/src/processorgen/specs/webhook.http.json b/src/processorgen/specs/webhook.http.json index 0bcf6702..6a167a23 100644 --- a/src/processorgen/specs/webhook.http.json +++ b/src/processorgen/specs/webhook.http.json @@ -86,6 +86,30 @@ "description": "Specifies in which field should the response status be saved. If no value\nis set, then the response status will NOT be saved.\n\nFor more information about the format, see [Referencing fields](https://conduit.io/docs/processors/referencing-fields).", "type": "string", "validations": [] + }, + "sdk.schema.decode.key.enabled": { + "default": "true", + "description": "Whether to decode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.decode.payload.enabled": { + "default": "true", + "description": "Whether to decode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.key.enabled": { + "default": "true", + "description": "Whether to encode the record key using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null + }, + "sdk.schema.encode.payload.enabled": { + "default": "true", + "description": "Whether to encode the record payload using its corresponding schema from the schema registry.", + "type": "bool", + "validations": null } } }, diff --git a/static/img/conduit/conduit-architecture.svg b/static/img/conduit/conduit-architecture.svg new file mode 100644 index 00000000..2c6087f6 --- /dev/null +++ b/static/img/conduit/conduit-architecture.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/static/img/conduit/conduit-diagram.svg b/static/img/conduit/conduit-diagram.svg deleted file mode 100644 index d2a8fdff..00000000 --- a/static/img/conduit/conduit-diagram.svg +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file