Skip to content

Commit

Permalink
Document CREATE CONNECTION behavior (#213)
Browse files Browse the repository at this point in the history
* save

* iceberg not supported in 2.2

* incorporate feedback

* feedback
  • Loading branch information
WanYixian authored Jan 24, 2025
1 parent c54d78e commit 9bbe626
Showing 1 changed file with 88 additions and 18 deletions.
106 changes: 88 additions & 18 deletions sql/commands/sql-create-connection.mdx
Original file line number Diff line number Diff line change
@@ -1,44 +1,114 @@
---
title: "CREATE CONNECTION"
description: "Use the `CREATE CONNECTION` command to create an AWS PrivateLink connection for a Kafka connector."
description: "Use the `CREATE CONNECTION` command to create a reusable catalog for connector parameters."
---

This is necessary in order to be able to consume messages from a Kafka service located in a different VPC from the RisingWave cluster in the cloud.
The `CREATE CONNECTION` command creates a reusable connection configuration that can be referenced when creating sources, sinks, or tables. Currently supported connection types are Kafka and schema registry.

## Syntax

```sql
CREATE CONNECTION [ IF NOT EXISTS ] connection_name
WITH (
connection_parameter = 'value'
type = '<connector_type>',
connection_parameter = SECRET `<secret_name>`,
...
);
```

## Parameters
Connection parameters can reference secrets using the `SECRET` keyword. This allows sensitive information to be stored securely and referenced in the connection configuration. Additionally, changes to the secret are automatically applied, so there's no need to alter the connection.

All WITH options are required unless stated otherwise.

## Parameter

| Parameter or clause | Description |
| :------------------ | :------------------------------------------------------------------------------------------------------------------------------- |
| _connection\_name_ | The name of the connection to be created. |
| type | The type of connection. |
| provider | The provider of the connection. |
| service.name | The service name of the endpoint service. |
| tags | Optional. The AWS tags used to check for resource leakage. This parameter should have the format: key1=value1, key2=value2, .... |
| `type` | Required. The type of connection. Supported values: `kafka`, `schema_registry`. |
| `properties.bootstrap.server` | The Kafka bootstrap server addresses. Required when `type` is `kafka`. |


<Accordion title="Click to see all supported properties for Kafka connection.">

The following properties are optional and can be included in the Kafka connection configuration as needed:

**SSL/SASL authentication:**
- `properties.security.protocol`
- `properties.ssl.endpoint.identification.algorithm`
- `properties.ssl.ca.location`
- `properties.ssl.ca.pem`
- `properties.ssl.certificate.location`
- `properties.ssl.certificate.pem`
- `properties.ssl.key.location`
- `properties.ssl.key.pem`
- `properties.ssl.key.password`
- `properties.sasl.mechanism`
- `properties.sasl.username`
- `properties.sasl.password`
- `properties.sasl.kerberos.service.name`
- `properties.sasl.kerberos.keytab`
- `properties.sasl.kerberos.principal`
- `properties.sasl.kerberos.kinit.cmd`
- `properties.sasl.kerberos.min.time.before.relogin`
- `properties.sasl.oauthbearer.config`

**PrivateLink connection:**
- `privatelink.targets`
- `privatelink.endpoint`

**AWS authentication:**
- `aws.region`
- `endpoint`
- `aws.credentials.access_key_id`
- `aws.credentials.secret_access_key`
- `aws.credentials.session_token`
- `aws.credentials.role.arn`
- `aws.credentials.role.external_id`
</Accordion>

<Note>
You can either tag the VPC endpoints by specifying the `tags` parameter when using the `CREATE CONNECTION` command or by specifying the environment variable `RW_PRIVATELINK_ENDPOINT_DEFAULT_TAGS`. When specifying the tags, follow the format of `key1=value1, key2=value2, ...`. If both are specified, the tags specified in the environment variable will be appended to the ones specified by the `tags` parameter.
</Note>

## Example

The statement below creates an AWS PrivateLink connection.
To connect to a schema registry:

```sql
CREATE CONNECTION connection_name WITH (
type = 'privatelink',
provider = 'aws',
service.name = 'com.amazonaws.xyz.us-east-1.abc-xyz-0000'
CREATE CONNECTION sr_conn WITH (
type = 'schema_registry',
schema.registry = 'http://...',
schema.registry.username = 'admin_user',
schema.registry.password = 'schema_registry_password'
);
```

To create a Kafka connection that securely integrates secrets:

```sql
CREATE CONNECTION conn_kafka WITH (
type = 'kafka',
properties.bootstrap.server='<broker addr>',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username=SECRET <username>,
properties.sasl.password=SECRET <password>
);
```

```sql
CREATE TABLE t WITH (
connector = 'kafka',
topic = 'demo-topic',
connection = conn_kafka
) FORMAT PLAIN ENCODE AVRO (connection = sr_conn);
```

To create a source, table or sink from the connection, the name of connector and connection must match those specified above. Also, the attributes defined in the connection and the source/table/sink cannot overlap:

```sql
CREATE SINK sink_kafka from data_table WITH (
connector = 'kafka',
connection = conn_kafka,
topic = 'connection_ddl_1'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);
```

Expand Down

0 comments on commit 9bbe626

Please sign in to comment.