-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reorganize file source part #95
Changes from 4 commits
10bf35c
f2ee873
7fcdbdf
abd46e4
fe9d5d0
3b36991
119d407
ea94321
9dd84e6
11018d2
da229cd
77efad9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ The statement will create a streaming job that continuously ingests data from th | |
|
||
### Insert data into tables | ||
|
||
You can load data in batch to RisingWave by creating a table ([CREATE TABLE](/sql/commands/sql-create-table)) and then inserting data into it ([INSERT](/sql/commands/sql-insert)). For example, the statement below creates a table `website_visits` and inserts 5 rows of data. | ||
You can load data in batch to RisingWave by [creating a table](/sql/commands/sql-create-table) and then [inserting](/sql/commands/sql-insert) data into it. For example, the statement below creates a table `website_visits` and inserts 5 rows of data. | ||
WanYixian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```sql | ||
CREATE TABLE website_visits ( | ||
|
@@ -120,6 +120,52 @@ CREATE TABLE t1( | |
INSERT INTO t1 SELECT * FROM source_iceberg_t1; | ||
``` | ||
|
||
## File source management | ||
|
||
RisingWave supports reading data from file sources including AWS S3, GCS, and Azure Blob Storage. | ||
|
||
### Batch reading from file source | ||
|
||
To read data in batch from file sources, you need to create a materialized view from the source or create a table with the appropriate connector. You can also directly query the file source. Below are examples using AWS S3. | ||
|
||
```sql | ||
-- Create a source that connects to S3 | ||
CREATE SOURCE s3_source WITH ( connector = 's3', ... ); | ||
|
||
-- Create a materialized view from the source for batch processing | ||
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source; | ||
|
||
-- Create a table using the S3 connector | ||
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3', ... ); | ||
|
||
-- Select from the source directly | ||
SELECT count(*) from s3_source; | ||
``` | ||
|
||
### Data type mapping in Parquet | ||
|
||
You can use the table function `file_scan()` to read Parquet files from sources. Below is the data type mapping that shows how RisingWave converts data types from file sources in Parquet format. | ||
WanYixian marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
| File source type | RisingWave type | | ||
| :----------- | :-------------- | | ||
| boolean | boolean | | ||
| integer | int | | ||
| long | bigint | | ||
| float | real | | ||
| double | double | | ||
| string | varchar | | ||
| date | date | | ||
| timestamptz | timestamptz | | ||
| timestamp | timestamp | | ||
| decimal | decimal | | ||
| Int8 | Int16 | | ||
| UInt8 | Int16 | | ||
| UInt16 | Int32 | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The right column for RisingWave shall use the names The left column for Parquet shall also be consistent. Names shall be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For unsupported datatype, I think Int96 , FIXED_LEN_BYTE_ARRAY , refer to https://parquet.apache.org/docs/file-format/types/ Not sure if there is any omission, can you help to confirm? |
||
| UInt32 | Int64 | | ||
| UInt64 | Decimal | | ||
| Float16 | Float32 | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
|
||
|
||
## Topics in this section | ||
|
||
The information presented above provides a brief overview of the data ingestion process in RisingWave. To gain a more comprehensive understanding of this process, the following topics in this section will delve more deeply into the subject matter. Here is a brief introduction to what you can expect to find in each topic: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,11 @@ To ingest data in formats marked with "T", you need to create tables (with conne | |
| [Kinesis](/integrations/sources/kinesis) | Latest | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | ||
| [PostgreSQL CDC](/integrations/sources/postgresql-cdc) | 10, 11, 12, 13, 14 | [Debezium JSON](#debezium-json) (T) | | ||
| [MySQL CDC](/integrations/sources/mysql-cdc) | 5.7, 8.0 | [Debezium JSON](#debezium-json) (T) | | ||
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | | ||
| [CDC via Kafka](/ingestion/change-data-capture-with-risingwave) | | [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | ||
| [Amazon S3](/integrations/sources/s3) | Latest | [JSON](#json), CSV | | ||
| [Load generator](/ingestion/generate-test-data) | Built-in | [JSON](#json) | | ||
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | | ||
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | [JSON](#json) | | | ||
| [Google Pub/Sub](/integrations/sources/google-pub-sub) | | [Avro](#avro), [JSON](#json), [protobuf](#protobuf), [Debezium JSON](#debezium-json) (T), [Maxwell JSON](#maxwell-json) (T), [Canal JSON](#canal-json) (T) | | ||
| [Google Cloud Storage](/integrations/sources/google-cloud-storage) | | [JSON](#json) | | ||
|
||
<Note> | ||
When a source is created, RisingWave does not ingest data immediately. RisingWave starts to process data when a materialized view is created based on the source. | ||
|
@@ -63,6 +63,15 @@ FORMAT [ DEBEZIUM | UPSERT | PLAIN ] ENCODE AVRO ( | |
|
||
Note that for `map.handling.mode = 'jsonb'`, the value types can only be: `null`, `boolean`, `int`, `string`, or `map`/`record`/`array` with these types. | ||
|
||
### Bytes | ||
|
||
RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to have an example here? I don't get this exactly one field part. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
```sql | ||
FORMAT PLAIN | ||
ENCODE BYTES | ||
``` | ||
|
||
### Debezium AVRO | ||
|
||
When creating a source from streams in with Debezium AVRO, the schema of the source does not need to be defined in the `CREATE TABLE` statement as it can be inferred from the `SCHEMA REGISTRY`. This means that the schema file location must be specified. The schema file location can be an actual Web location, which is in `http://...`, `https://...`, or `S3://...` format, or a Confluent Schema Registry. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry). | ||
|
@@ -120,12 +129,12 @@ RisingWave supports the TiCDC dialect of the Canal CDC format. When creating a s | |
|
||
Syntax: | ||
|
||
```js | ||
```sql | ||
FORMAT CANAL | ||
ENCODE JSON | ||
``` | ||
|
||
### Debezium json | ||
### Debezium JSON | ||
|
||
When creating a source from streams in Debezium JSON, you can define the schema of the source within the parentheses after the source name (`schema_definition` in the syntax), and specify the data and encoding formats in the `FORMAT` and `ENCODE` sections. You can directly reference data fields in the JSON payload by their names as column names in the schema. | ||
|
||
|
@@ -135,7 +144,7 @@ Note that if you are ingesting data of type `timestamp` or `timestamptz` in Risi | |
|
||
Syntax: | ||
|
||
```js | ||
```sql | ||
FORMAT DEBEZIUM | ||
ENCODE JSON [ ( | ||
[ ignore_key = 'true | false ' ] | ||
|
@@ -148,7 +157,7 @@ When loading data from MongoDB via Kafka topics in Debezium Mongo JSON format, t | |
|
||
Syntax: | ||
|
||
```js | ||
```sql | ||
FORMAT DEBEZIUM_MONGO | ||
ENCODE JSON | ||
``` | ||
|
@@ -159,7 +168,7 @@ When creating a source from streams in Maxwell JSON, you can define the schema o | |
|
||
Syntax: | ||
|
||
```js | ||
```sql | ||
FORMAT MAXWELL | ||
ENCODE JSON | ||
``` | ||
|
@@ -172,7 +181,7 @@ You can define the schema of the source within the parentheses after the source | |
|
||
Syntax: | ||
|
||
```js | ||
```sql | ||
FORMAT UPSERT | ||
ENCODE JSON [ ( | ||
schema.registry = 'schema_registry_url [, ...]', | ||
|
@@ -181,6 +190,18 @@ ENCODE JSON [ ( | |
) ] | ||
``` | ||
|
||
### Parquet | ||
|
||
Parquet format allows you to efficiently store and retrieve large datasets by utilizing a columnar storage architecture. RisingWave supports reading Parquet files from object storage systems including Amazon S3, Google Cloud Storage (GCS), and Azure Blob Storage. | ||
|
||
|
||
Syntax: | ||
|
||
```sql | ||
FORMAT PLAIN | ||
ENCODE PARQUET | ||
``` | ||
|
||
### Protobuf | ||
|
||
For data in protobuf format, you must specify a message (fully qualified by package path) and a schema location. The schema location can be an actual Web location that is in `http://...`, `https://...`, or `S3://...` format. For Kafka data in protobuf, instead of providing a schema location, you can provide a Confluent Schema Registry that RisingWave can get the schema from. For more details about using Schema Registry for Kafka data, see [Read schema from Schema Registry](/integrations/sources/kafka#read-schemas-from-confluent-schema-registry). | ||
|
@@ -199,7 +220,7 @@ protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema. | |
|
||
Syntax: | ||
|
||
```js | ||
```sql | ||
FORMAT PLAIN | ||
ENCODE PROTOBUF ( | ||
message = 'com.example.MyMessage', | ||
|
@@ -209,14 +230,6 @@ ENCODE PROTOBUF ( | |
|
||
For more information on supported protobuf types, refer to [Supported protobuf types](/sql/data-types/supported-protobuf-types). | ||
|
||
### Bytes | ||
|
||
RisingWave allows you to read data streams without decoding the data by using the `BYTES` row format. However, the table or source can have exactly one field of `BYTEA` data. | ||
|
||
```js | ||
FORMAT PLAIN | ||
ENCODE BYTES | ||
``` | ||
|
||
## General parameters for supported formats | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WanYixian Let's always refer to Parquet as a format. "Sinking data in Parquet format..."