From 9bbe626c15bdc5e64663dbc20140f486e447ef33 Mon Sep 17 00:00:00 2001 From: IrisWan <150207222+WanYixian@users.noreply.github.com> Date: Fri, 24 Jan 2025 17:11:42 +0800 Subject: [PATCH] Document `CREATE CONNECTION` behavior (#213) * save * iceberg not supported in 2.2 * incorporate feedback * feedback --- sql/commands/sql-create-connection.mdx | 106 ++++++++++++++++++++----- 1 file changed, 88 insertions(+), 18 deletions(-) diff --git a/sql/commands/sql-create-connection.mdx b/sql/commands/sql-create-connection.mdx index 77ca441b..59d7d381 100644 --- a/sql/commands/sql-create-connection.mdx +++ b/sql/commands/sql-create-connection.mdx @@ -1,44 +1,114 @@ --- title: "CREATE CONNECTION" -description: "Use the `CREATE CONNECTION` command to create an AWS PrivateLink connection for a Kafka connector." +description: "Use the `CREATE CONNECTION` command to create a reusable catalog for connector parameters." --- -This is necessary in order to be able to consume messages from a Kafka service located in a different VPC from the RisingWave cluster in the cloud. +The `CREATE CONNECTION` command creates a reusable connection configuration that can be referenced when creating sources, sinks, or tables. Currently supported connection types are Kafka and schema registry. ## Syntax ```sql CREATE CONNECTION [ IF NOT EXISTS ] connection_name WITH ( - connection_parameter = 'value' + type = '', + connection_parameter = SECRET ``, + ... ); ``` -## Parameters +Connection parameters can reference secrets using the `SECRET` keyword. This allows sensitive information to be stored securely and referenced in the connection configuration. Additionally, changes to the secret are automatically applied, so there's no need to alter the connection. -All WITH options are required unless stated otherwise. + +## Parameter | Parameter or clause | Description | | :------------------ | :------------------------------------------------------------------------------------------------------------------------------- | -| _connection\_name_ | The name of the connection to be created. | -| type | The type of connection. | -| provider | The provider of the connection. | -| service.name | The service name of the endpoint service. | -| tags | Optional. The AWS tags used to check for resource leakage. This parameter should have the format: key1=value1, key2=value2, .... | +| `type` | Required. The type of connection. Supported values: `kafka`, `schema_registry`. | +| `properties.bootstrap.server` | The Kafka bootstrap server addresses. Required when `type` is `kafka`. | + + + + +The following properties are optional and can be included in the Kafka connection configuration as needed: + + **SSL/SASL authentication:** + - `properties.security.protocol` + - `properties.ssl.endpoint.identification.algorithm` + - `properties.ssl.ca.location` + - `properties.ssl.ca.pem` + - `properties.ssl.certificate.location` + - `properties.ssl.certificate.pem` + - `properties.ssl.key.location` + - `properties.ssl.key.pem` + - `properties.ssl.key.password` + - `properties.sasl.mechanism` + - `properties.sasl.username` + - `properties.sasl.password` + - `properties.sasl.kerberos.service.name` + - `properties.sasl.kerberos.keytab` + - `properties.sasl.kerberos.principal` + - `properties.sasl.kerberos.kinit.cmd` + - `properties.sasl.kerberos.min.time.before.relogin` + - `properties.sasl.oauthbearer.config` + + **PrivateLink connection:** + - `privatelink.targets` + - `privatelink.endpoint` + + **AWS authentication:** + - `aws.region` + - `endpoint` + - `aws.credentials.access_key_id` + - `aws.credentials.secret_access_key` + - `aws.credentials.session_token` + - `aws.credentials.role.arn` + - `aws.credentials.role.external_id` + - -You can either tag the VPC endpoints by specifying the `tags` parameter when using the `CREATE CONNECTION` command or by specifying the environment variable `RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS`. When specifying the tags, follow the format of `key1=value1, key2=value2, ...`. If both are specified, the tags specified in the environment variable will be appended to the ones specified by the `tags` parameter. - ## Example -The statement below creates an AWS PrivateLink connection. +To connect to a schema registry: ```sql -CREATE CONNECTION connection_name WITH ( - type = 'privatelink', - provider = 'aws', - service.name = 'com.amazonaws.xyz.us-east-1.abc-xyz-0000' +CREATE CONNECTION sr_conn WITH ( + type = 'schema_registry', + schema.registry = 'http://...', + schema.registry.username = 'admin_user', + schema.registry.password = 'schema_registry_password' +); +``` + +To create a Kafka connection that securely integrates secrets: + +```sql +CREATE CONNECTION conn_kafka WITH ( + type = 'kafka', + properties.bootstrap.server='', + properties.sasl.mechanism='PLAIN', + properties.security.protocol='SASL_PLAINTEXT', + properties.sasl.username=SECRET , + properties.sasl.password=SECRET +); +``` + +```sql +CREATE TABLE t WITH ( + connector = 'kafka', + topic = 'demo-topic', + connection = conn_kafka +) FORMAT PLAIN ENCODE AVRO (connection = sr_conn); +``` + +To create a source, table or sink from the connection, the name of connector and connection must match those specified above. Also, the attributes defined in the connection and the source/table/sink cannot overlap: + +```sql +CREATE SINK sink_kafka from data_table WITH ( + connector = 'kafka', + connection = conn_kafka, + topic = 'connection_ddl_1' +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true' ); ```