diff --git a/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Capture.java b/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Capture.java index cb32262..700c844 100644 --- a/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Capture.java +++ b/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Capture.java @@ -22,7 +22,7 @@ @Getter @NoArgsConstructor @Schema( - title = "Wait for change data capture event on MySQL server" + title = "Wait for change data capture event on MySQL server." ) @Plugin( examples = { @@ -30,8 +30,8 @@ code = { "snapshotMode: NEVER", "hostname: 127.0.0.1", - "port: 63306", - "username: root", + "port: \"3306\"", + "username: mysql_user", "password: mysql_passwd", "maxRecords: 100", } diff --git a/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/MysqlInterface.java b/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/MysqlInterface.java index 6d4a5d4..2b544c9 100644 --- a/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/MysqlInterface.java +++ b/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/MysqlInterface.java @@ -11,12 +11,12 @@ public interface MysqlInterface { @Schema( title = "Specifies the criteria for running a snapshot when the connector starts.", description = " Possible settings are:\n" + - "- `INITIAL`: the connector runs a snapshot only when no offsets have been recorded for the logical server name.\n" + - "- `INITIAL_ONLY`: the connector runs a snapshot only when no offsets have been recorded for the logical server name and then stops; i.e. it will not read change events from the binlog.\n" + - "- `WHEN_NEEDED`: the connector runs a snapshot upon startup whenever it deems it necessary. That is, when no offsets are available, or when a previously recorded offset specifies a binlog location or GTID that is not available in the server.\n" + - "- `NEVER`: - the connector never uses snapshots. Upon first startup with a logical server name, the connector reads from the beginning of the binlog. Configure this behavior with care. It is valid only when the binlog is guaranteed to contain the entire history of the database.\n" + - "- `SCHEMA_ONLY`: the connector runs a snapshot of the schemas and not the data. This setting is useful when you do not need the topics to contain a consistent snapshot of the data but need them to have only the changes since the connector was started.\n" + - "- `SCHEMA_ONLY_RECOVERY`: this is a recovery setting for a connector that has already been capturing changes. When you restart the connector, this setting enables recovery of a corrupted or lost database history topic. You might set it periodically to \"clean up\" a database history topic that has been growing unexpectedly. Database history topics require infinite retention." + "- `INITIAL`: The connector runs a snapshot only when no offsets have been recorded for the logical server name.\n" + + "- `INITIAL_ONLY`: The connector runs a snapshot only when no offsets have been recorded for the logical server name and then stops; i.e. it will not read change events from the binlog.\n" + + "- `WHEN_NEEDED`: The connector runs a snapshot upon startup whenever it deems it necessary. That is, when no offsets are available, or when a previously recorded offset specifies a binlog location or GTID that is not available in the server.\n" + + "- `NEVER`: The connector never uses snapshots. Upon first startup with a logical server name, the connector reads from the beginning of the binlog. Configure this behavior with care. It is valid only when the binlog is guaranteed to contain the entire history of the database.\n" + + "- `SCHEMA_ONLY`: The connector runs a snapshot of the schemas and not the data. This setting is useful when you do not need the topics to contain a consistent snapshot of the data but need them to have only the changes since the connector was started.\n" + + "- `SCHEMA_ONLY_RECOVERY`: This is a recovery setting for a connector that has already been capturing changes. When you restart the connector, this setting enables recovery of a corrupted or lost database history topic. You might set it periodically to \"clean up\" a database history topic that has been growing unexpectedly. Database history topics require infinite retention." ) @PluginProperty(dynamic = false) @NotNull @@ -24,7 +24,7 @@ public interface MysqlInterface { @Schema( title = "A numeric ID of this database client.", - description = "which must be unique across all currently-running database processes in the MySQL cluster. " + + description = "This must be unique across all currently-running database processes in the MySQL cluster. " + "This connector joins the MySQL database cluster as another server (with this unique ID) so it can read " + "the binlog. By default, a random number between 5400 and 6400 is generated, though the recommendation " + "is to explicitly set a value." @@ -38,6 +38,7 @@ public enum SnapshotMode { INITIAL_ONLY, WHEN_NEEDED, NEVER, - SCHEMA_ONLY + SCHEMA_ONLY, + SCHEMA_ONLY_RECOVERY } } diff --git a/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Trigger.java b/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Trigger.java index d418a4e..a0d950b 100644 --- a/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Trigger.java +++ b/plugin-debezium-mysql/src/main/java/io/kestra/plugin/debezium/mysql/Trigger.java @@ -29,7 +29,7 @@ @Getter @NoArgsConstructor @Schema( - title = "Wait for change data capture event on MySQL server and create new execution" + title = "Wait for change data capture event on MySQL server and create new execution." ) @Plugin( examples = { @@ -37,8 +37,8 @@ code = { "snapshotMode: NEVER", "hostname: 127.0.0.1", - "port: 63306", - "username: root", + "port: \"3306\"", + "username: mysql_user", "password: mysql_passwd", "maxRecords: 100", } diff --git a/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Capture.java b/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Capture.java index 00c7861..8e29304 100644 --- a/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Capture.java +++ b/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Capture.java @@ -19,15 +19,15 @@ @Getter @NoArgsConstructor @Schema( - title = "Wait for change data capture event on PostgresSQL server" + title = "Wait for change data capture event on PostgreSQL server." ) @Plugin( examples = { @Example( code = { "hostname: 127.0.0.1", - "port: 5432", - "username: posgres", + "port: \"5432\"", + "username: psql_user", "password: psql_passwd", "maxRecords: 100", "database: my_database", diff --git a/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/PostgresInterface.java b/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/PostgresInterface.java index 06fe097..95b7632 100644 --- a/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/PostgresInterface.java +++ b/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/PostgresInterface.java @@ -27,7 +27,7 @@ public interface PostgresInterface { PluginName getPluginName(); @Schema( - title = "The name of the PostgreSQL publication created for streaming changes when using `pgoutput`.", + title = "The name of the PostgreSQL publication created for streaming changes when using `PGOUTPUT`.", description = "This publication is created at start-up if it does not already exist and it includes all tables. " + "Debezium then applies its own include/exclude list filtering, if configured, to limit the publication to " + "change events for the specific tables of interest. The connector user must have superuser permissions to " + @@ -55,7 +55,7 @@ public interface PostgresInterface { "- `DISABLE` uses an unencrypted connection.\n" + "- `REQUIRE` uses a secure (encrypted) connection, and fails if one cannot be established.\n" + "- `VERIFY_CA` behaves like require but also verifies the server TLS certificate against the configured Certificate Authority (CA) certificates, or fails if no valid matching CA certificates are found.\n" + - "- `VERIFY_FULL` behaves like verify-ca but also verifies that the server certificate matches the host to which the connector is trying to connect. \n\n" + + "- `VERIFY_FULL` behaves like verify-ca but also verifies that the server certificate matches the host to which the connector is trying to connect.\n\n" + "See the [PostgreSQL documentation](https://www.postgresql.org/docs/current/static/libpq-connect.html) for more information." ) @PluginProperty(dynamic = false) @@ -63,21 +63,21 @@ public interface PostgresInterface { @Schema( title = "The root certificate(s) against which the server is validated.", - description = "Must be a PEM encoded certificate" + description = "Must be a PEM encoded certificate." ) @PluginProperty(dynamic = true) String getSslRootCert(); @Schema( title = "The SSL certificate for the client.", - description = "Must be a PEM encoded certificate" + description = "Must be a PEM encoded certificate." ) @PluginProperty(dynamic = true) String getSslCert(); @Schema( title = "The SSL private key of the client.", - description = "Must be a PEM encoded key" + description = "Must be a PEM encoded key." ) @PluginProperty(dynamic = true) String getSslKey(); diff --git a/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Trigger.java b/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Trigger.java index b783685..9aa9d96 100644 --- a/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Trigger.java +++ b/plugin-debezium-postgres/src/main/java/io/kestra/plugin/debezium/postgres/Trigger.java @@ -29,14 +29,14 @@ @Getter @NoArgsConstructor @Schema( - title = "Wait for change data capture event on PostgresSQL server and create new execution" + title = "Wait for change data capture event on PostgreSQL server and create new execution." ) @Plugin( examples = { @Example( code = { "hostname: 127.0.0.1", - "port: 5432", + "port: \"5432\"", "username: posgres", "password: psql_passwd", "maxRecords: 100", diff --git a/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Capture.java b/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Capture.java index ba88eb8..251b804 100644 --- a/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Capture.java +++ b/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Capture.java @@ -19,15 +19,16 @@ @Getter @NoArgsConstructor @Schema( - title = "Wait for change data capture event on Microsoft SQL server" + title = "Wait for change data capture event on Microsoft SQL Server." ) @Plugin( examples = { @Example( code = { + "snapshotMode: INITIAL", "hostname: 127.0.0.1", - "port: 1433", - "username: sa", + "port: \"1433\"", + "username: sqlserver_user", "password: sqlserver_passwd", "maxRecords: 100", } diff --git a/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/SqlServerInterface.java b/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/SqlServerInterface.java index b524c1f..2cecc5e 100644 --- a/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/SqlServerInterface.java +++ b/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/SqlServerInterface.java @@ -14,7 +14,7 @@ public interface SqlServerInterface { @Schema( - title = "The name of the PostgreSQL database from which to stream the changes." + title = "The name of the Microsoft SQL Server database from which to stream the changes." ) @PluginProperty(dynamic = true) @NotNull diff --git a/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Trigger.java b/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Trigger.java index 4f60745..b10479a 100644 --- a/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Trigger.java +++ b/plugin-debezium-sqlserver/src/main/java/io/kestra/plugin/debezium/sqlserver/Trigger.java @@ -29,15 +29,16 @@ @Getter @NoArgsConstructor @Schema( - title = "Wait for change data capture event on Microsoft SQL server and create new execution" + title = "Wait for change data capture event on Microsoft SQL Server and create new execution." ) @Plugin( examples = { @Example( code = { + "snapshotMode: INITIAL", "hostname: 127.0.0.1", - "port: 1433", - "username: sa", + "port: \"1433\"", + "username: sqlserver_user", "password: sqlserver_passwd", "database: deb", "maxRecords: 100", diff --git a/plugin-debezium/src/main/java/io/kestra/plugin/debezium/AbstractDebeziumInterface.java b/plugin-debezium/src/main/java/io/kestra/plugin/debezium/AbstractDebeziumInterface.java index 85de406..f1df042 100644 --- a/plugin-debezium/src/main/java/io/kestra/plugin/debezium/AbstractDebeziumInterface.java +++ b/plugin-debezium/src/main/java/io/kestra/plugin/debezium/AbstractDebeziumInterface.java @@ -11,102 +11,102 @@ public interface AbstractDebeziumInterface { @Schema( - title = "The format of output", + title = "The format of the output.", description = " Possible settings are:\n" + "- `RAW`: Send raw data from debezium.\n" + - "- `INLINE`: Send a row like in the source with only data (remove after & before), all the cols will be present on each rows.\n" + - "- `WRAP`: Send a row like INLINE but wrapped on a `record` field.\n" + "- `INLINE`: Send a row like in the source with only data (remove after & before), all the columns will be present for each row.\n" + + "- `WRAP`: Send a row like INLINE but wrapped in a `record` field.\n" ) @PluginProperty(dynamic = false) @NotNull AbstractDebeziumTask.Format getFormat(); @Schema( - title = "How to handle deleted rows", + title = "Specify how to handle deleted rows.", description = " Possible settings are:\n" + - "- `ADD_FIELD`: add a deleted fields as boolean.\n" + - "- `NULL`: send a row will all values as null.\n" + - "- `DROP`: don't send row deleted." + "- `ADD_FIELD`: Add a deleted field as boolean.\n" + + "- `NULL`: Send a row with all values as null.\n" + + "- `DROP`: Don't send deleted row." ) @PluginProperty(dynamic = false) @NotNull AbstractDebeziumTask.Deleted getDeleted(); @Schema( - title = "The name of deleted fields if deleted is `ADD_FIELD`" + title = "The name of deleted field if deleted is `ADD_FIELD`." ) @PluginProperty(dynamic = false) @NotNull String getDeletedFieldName(); @Schema( - title = "How to handle key", + title = "Specify how to handle key.", description = " Possible settings are:\n" + - "- `ADD_FIELD`: add key(s) merged with cols.\n" + - "- `DROP`: drop keys." + "- `ADD_FIELD`: Add key(s) merged with columns.\n" + + "- `DROP`: Drop keys." ) @PluginProperty(dynamic = false) @NotNull AbstractDebeziumTask.Key getKey(); @Schema( - title = "How to handle metadata", + title = "Specify how to handle metadata.", description = " Possible settings are:\n" + - "- `ADD_FIELD`: add metadata in a col named `metadata`.\n" + - "- `DROP`: drop keys." + "- `ADD_FIELD`: Add metadata in a column named `metadata`.\n" + + "- `DROP`: Drop metadata." ) @PluginProperty(dynamic = false) @NotNull AbstractDebeziumTask.Metadata getMetadata(); @Schema( - title = "The name of metadata fields if metadata is `ADD_FIELD`" + title = "The name of metadata field if metadata is `ADD_FIELD`." ) @PluginProperty(dynamic = false) @NotNull String getMetadataFieldName(); @Schema( - title = "Split table on separate output `uris`", + title = "Split table on separate output `uris`.", description = " Possible settings are:\n" + - "- `TABLE`: will split all rows by tables on output with name `database.table`\n" + - "- `DATABASE`: will split all rows by database on output with name `database`.\n" + - "- `OFF`: will **NOT** split all rows resulting a single `data` output." + "- `TABLE`: This will split all rows by tables on output with name `database.table`\n" + + "- `DATABASE`: This will split all rows by databases on output with name `database`.\n" + + "- `OFF`: This will **NOT** split all rows resulting in a single `data` output." ) @PluginProperty(dynamic = false) @NotNull AbstractDebeziumTask.SplitTable getSplitTable(); @Schema( - title = "Ignore ddl statement", - description = "Ignore create table and others administration operations" + title = "Ignore DDL statement.", + description = "Ignore CREATE, ALTER, DROP and TRUNCATE operations." ) @PluginProperty(dynamic = false) @NotNull Boolean getIgnoreDdl(); @Schema( - title = "Hostname of the remote server" + title = "Hostname of the remote server." ) @PluginProperty(dynamic = true) @NotNull String getHostname(); @Schema( - title = "Port of the remote server" + title = "Port of the remote server." ) @PluginProperty(dynamic = true) @NotNull String getPort(); @Schema( - title = "Username on the remote server" + title = "Username on the remote server." ) @PluginProperty(dynamic = true) String getUsername(); @Schema( - title = "Password on the remote server" + title = "Password on the remote server." ) @PluginProperty(dynamic = true) String getPassword(); @@ -119,7 +119,7 @@ public interface AbstractDebeziumInterface { Object getIncludedDatabases(); @Schema( - title = "An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes. ", + title = "An optional, comma-separated list of regular expressions that match the names of databases for which you do not want to capture changes.", description = "The connector captures changes in any database whose name is not in the `excludedDatabases`. Do not also set the `includedDatabases` connector configuration property." ) @PluginProperty(dynamic = true) @@ -142,49 +142,49 @@ public interface AbstractDebeziumInterface { @Schema( title = "An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to exclude from change event record values.", - description = "Fully-qualified names for columns are of the form databaseName.tableName.columnName." + description = "Fully-qualified names for columns are of the form databaseName.tableName.columnName. Do not also specify the `excludedColumns` connector configuration property." ) @PluginProperty(dynamic = true) Object getIncludedColumns(); @Schema( title = "An optional, comma-separated list of regular expressions that match the fully-qualified names of columns to include in change event record values.", - description = "Fully-qualified names for columns are of the form databaseName.tableName.columnName." + description = "Fully-qualified names for columns are of the form databaseName.tableName.columnName. Do not also specify the `includedColumns` connector configuration property.\"" ) @PluginProperty(dynamic = true) Object getExcludedColumns(); @Schema( - title = "Additional configuration properties", - description = "Any additional configuration properties that is valid for the current driver" + title = "Additional configuration properties.", + description = "Any additional configuration properties that is valid for the current driver." ) @PluginProperty(dynamic = true) Map getProperties(); @Schema( - title = "The name of Debezium state file" + title = "The name of Debezium state file." ) @PluginProperty(dynamic = false) @NotNull String getStateName(); @Schema( - title = "The max number of rows to fetch before stopping", - description = "It's not an hard limit and is evaluated every second" + title = "The maximum number of rows to fetch before stopping.", + description = "It's not an hard limit and is evaluated every second." ) @PluginProperty(dynamic = false) Integer getMaxRecords(); @Schema( - title = "The max total processing duration", - description = "It's not an hard limit and is evaluated every second" + title = "The maximum total processing duration.", + description = "It's not an hard limit and is evaluated every second." ) @PluginProperty(dynamic = false) Duration getMaxDuration(); @Schema( - title = "The max duration waiting for new rows", - description = "It's not an hard limit and is evaluated every second" + title = "The maximum duration waiting for new rows.", + description = "It's not an hard limit and is evaluated every second." ) @PluginProperty(dynamic = false) Duration getMaxWait();