From 8258911decf164ad6c90a9249038669210d5e44b Mon Sep 17 00:00:00 2001 From: gnehil Date: Mon, 4 Dec 2023 21:50:20 +0800 Subject: [PATCH] update spark connector doc --- .../docs/ecosystem/spark-doris-connector.md | 84 +++++++++++++------ .../docs/ecosystem/spark-doris-connector.md | 81 ++++++++++++------ 2 files changed, 113 insertions(+), 52 deletions(-) diff --git a/docs/en/docs/ecosystem/spark-doris-connector.md b/docs/en/docs/ecosystem/spark-doris-connector.md index d3bbafd46a523c..8921c68d09fc87 100644 --- a/docs/en/docs/ecosystem/spark-doris-connector.md +++ b/docs/en/docs/ecosystem/spark-doris-connector.md @@ -37,11 +37,12 @@ Github: https://github.com/apache/doris-spark-connector ## Version Compatibility -| Connector | Spark | Doris | Java | Scala | -|-----------|---------------|-------------|------|------------| -| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | -| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | -| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 | +| Connector | Spark | Doris | Java | Scala | +|-----------|---------------------|-------------|------|------------| +| 1.3.0 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 + | 8 | 2.12, 2.11 | +| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | +| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | +| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 | ## Build and Install @@ -80,8 +81,8 @@ spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT ``` org.apache.doris - spark-doris-connector-3.1_2.12 - 1.2.0 + spark-doris-connector-3.4_2.12 + 1.3.0 ``` @@ -170,11 +171,14 @@ OPTIONS( INSERT INTO spark_doris VALUES ("VALUE1", "VALUE2", ...); -# -or +# or INSERT INTO spark_doris SELECT * FROM YOUR_TABLE +# or +INSERT OVERWRITE +SELECT * +FROM YOUR_TABLE ``` #### DataFrame(batch/stream) @@ -196,6 +200,8 @@ mockDataDF.write.format("doris") //other options //specify the fields to write .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") + // Support setting Overwrite mode to overwrite data + // .option("save_mode", SaveMode.Overwrite) .save() ## stream sink(StructuredStreaming) @@ -260,13 +266,14 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") | doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration | | doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true | | doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.
By default, all fields are written in the order of Doris table fields. | -| sink.batch.size | 10000 | Maximum number of lines in a single write BE | -| sink.max-retries | 1 | Number of retries after writing BE failed | -| sink.properties.* | -- | Import parameters for Stream Load.
For example:
Specify column separator: `'sink.properties.column_separator' = ','`, specify write data format: `'sink.properties.format' = 'json'` [More Multi-parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual#%E5%88%9B%E5%BB% BA%E5%AF%BC%E5%85%A5) | +| doris.sink.batch.size | 100000 | Maximum number of lines in a single write BE | +| doris.sink.max-retries | 0 | Number of retries after writing BE failed | +| sink.properties.* | -- | Import parameters for Stream Load.
For example:
Specify column separator: `'doris.sink.properties.column_separator' = ','`, specify write data format: `'doris.sink.properties.format' = 'json'` [More parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) | | doris.sink.task.partition.size | -- | The number of partitions corresponding to the Writing task. After filtering and other operations, the number of partitions written in Spark RDD may be large, but the number of records corresponding to each Partition is relatively small, resulting in increased writing frequency and waste of computing resources. The smaller this value is set, the less Doris write frequency and less Doris merge pressure. It is generally used with doris.sink.task.use.repartition. | | doris.sink.task.use.repartition | false | Whether to use repartition mode to control the number of partitions written by Doris. The default value is false, and coalesce is used (note: if there is no Spark action before the write, the whole computation will be less parallel). If it is set to true, then repartition is used (note: you can set the final number of partitions at the cost of shuffle). | | doris.sink.batch.interval.ms | 50 | The interval time of each batch sink, unit ms. | | doris.sink.enable-2pc | false | Whether to enable two-stage commit. When enabled, transactions will be committed at the end of the job, and all pre-commit transactions will be rolled back when some tasks fail. | +| doris.sink.auto-redirect | false | Whether to redirect StreamLoad requests. After being turned on, StreamLoad will write through FE and no longer obtain BE information explicitly. | ### SQL & Dataframe Configuration @@ -277,8 +284,24 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") | doris.filter.query.in.max.count | 100 | In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side. | | doris.ignore-type | -- | In a temporary view, specify the field types to ignore when reading the schema.
eg: when 'doris.ignore-type'='bitmap,hll' | -* Note: In Spark SQL, when writing data through insert into, if the target table of doris contains `BITMAP` or `HLL` type data, you need to set the parameter `doris.ignore-type` to the corresponding type, and set `doris.write.fields` maps the corresponding columns, the usage is as follows: -> 1. BITMAP +### Structured Streaming Configuration + +| Key | Default Value | Comment | +| -------------------------------- | ------------- | ---------------------------------------------------------------- | +| doris.sink.streaming.passthrough | false | Write the value of the first column directly without processing. | + +### RDD Configuration + +| Key | Default Value | Comment | +|-----------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------| +| doris.request.auth.user | -- | Doris username | +| doris.request.auth.password | -- | Doris password | +| doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. | + +:::tip + +1. In Spark SQL, when writing data through insert into, if the target table of doris contains `BITMAP` or `HLL` type data, you need to set the parameter `doris.ignore-type` to the corresponding type, and set `doris.write.fields` maps the corresponding columns, the usage is as follows: +> BITMAP > ```sql > CREATE TEMPORARY VIEW spark_doris > USING doris @@ -291,7 +314,7 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") > "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)" > ); > ``` -> 2. HLL +> HLL > ```sql > CREATE TEMPORARY VIEW spark_doris > USING doris @@ -305,19 +328,26 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)") > ); > ``` -### Structured Streaming Configuration - -| Key | Default Value | Comment | -| -------------------------------- | ------------- | ---------------------------------------------------------------- | -| doris.sink.streaming.passthrough | false | Write the value of the first column directly without processing. | - -### RDD Configuration +2. Since version 1.3.0, the default value of `doris.sink.max-retries` configuration is 0, which means no retries are performed by default. + When this parameter is set greater than 0, batch-level failure retries will be performed, and data of the configured size of `doris.sink.batch.size` will be cached in the Spark Executor memory. The memory allocation may need to be appropriately increased. + +3. Since version 1.3.0, overwrite mode insertion is supported (only full table-level overwrite insertion is supported). The specific usage is as follows +> DataFrame +> ```scala +> resultDf.format("doris") +> .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") +> // your own options +> .option("save_mode", SaveMode.Overwrite) +> .save() +> ``` +> +> SQL +> ```sql +> INSERT OVERWRITE your_target_table +> SELECT * FROM your_source_table +> ``` -| Key | Default Value | Comment | -|-----------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------| -| doris.request.auth.user | -- | Doris username | -| doris.request.auth.password | -- | Doris password | -| doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. | +::: ## Doris & Spark Column Type Mapping diff --git a/docs/zh-CN/docs/ecosystem/spark-doris-connector.md b/docs/zh-CN/docs/ecosystem/spark-doris-connector.md index 5b54a44d74a964..70a8fcac69170b 100644 --- a/docs/zh-CN/docs/ecosystem/spark-doris-connector.md +++ b/docs/zh-CN/docs/ecosystem/spark-doris-connector.md @@ -37,11 +37,12 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 ## 版本兼容 -| Connector | Spark | Doris | Java | Scala | -|-----------|---------------|-------------|------|------------| -| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | -| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | -| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 | +| Connector | Spark | Doris | Java | Scala | +|-----------|---------------------|-------------|------|------------| +| 1.3.0 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 + | 8 | 2.12, 2.11 | +| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | +| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 | +| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 | ## 编译与安装 @@ -79,8 +80,8 @@ spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT ``` org.apache.doris - spark-doris-connector-3.2_2.12 - 1.2.0 + spark-doris-connector-3.4_2.12 + 1.3.0 ``` @@ -169,11 +170,14 @@ OPTIONS( INSERT INTO spark_doris VALUES ("VALUE1", "VALUE2", ...); -# -or +# or INSERT INTO spark_doris SELECT * FROM YOUR_TABLE +# or +INSERT OVERWRITE +SELECT * +FROM YOUR_TABLE ``` #### DataFrame(batch/stream) @@ -195,6 +199,8 @@ mockDataDF.write.format("doris") //其它选项 //指定你要写入的字段 .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") + // 支持设置 Overwrite 模式来覆盖数据 + // .option("save_mode", SaveMode.Overwrite) .save() ## stream sink(StructuredStreaming) @@ -264,13 +270,14 @@ kafkaSource.selectExpr("CAST(value as STRING)") | doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch | | doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 | | doris.write.fields | -- | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照Doris表字段顺序写入全部字段。 | -| sink.batch.size | 10000 | 单次写BE的最大行数 | -| sink.max-retries | 1 | 写BE失败之后的重试次数 | -| sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'sink.properties.column_separator' = ','`、 指定写入数据格式:`'sink.properties.format' = 'json'` [更多参数详情](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual#%E5%88%9B%E5%BB%BA%E5%AF%BC%E5%85%A5) | +| doris.sink.batch.size | 100000 | 单次写BE的最大行数 | +| doris.sink.max-retries | 0 | 写BE失败之后的重试次数 | +| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`、 指定写入数据格式:`'doris.sink.properties.format' = 'json'` [更多参数详情](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) | | doris.sink.task.partition.size | -- | Doris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | | doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | | doris.sink.batch.interval.ms | 50 | 每个批次sink的间隔时间,单位 ms。 | | doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | +| doris.sink.auto-redirect | false | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入, 不再显式获取 BE 信息。 | ### SQL 和 Dataframe 专有配置 @@ -281,8 +288,24 @@ kafkaSource.selectExpr("CAST(value as STRING)") | doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 | | doris.ignore-type | -- | 指在定临时视图中,读取 schema 时要忽略的字段类型。
例如,'doris.ignore-type'='bitmap,hll' | -* 注:在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型, 并通过 `doris.write.fields` 对列进行映射转换,使用方式如下: -> 1. BITMAP +### Structured Streaming 专有配置 + +| Key | Default Value | Comment | +| -------------------------------- | ------------- | ---------------------------------------------------------------- | +| doris.sink.streaming.passthrough | false | 将第一列的值不经过处理直接写入。 | + +### RDD 专有配置 + +| Key | Default Value | Comment | +|-----------------------------|---------------|----------------------------------------------| +| doris.request.auth.user | -- | 访问Doris的用户名 | +| doris.request.auth.password | -- | 访问Doris的密码 | +| doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 | + +:::tip + +1. 在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型, 并通过 `doris.write.fields` 对列进行映射转换,使用方式如下: +> BITMAP > ```sql > CREATE TEMPORARY VIEW spark_doris > USING doris @@ -295,7 +318,7 @@ kafkaSource.selectExpr("CAST(value as STRING)") > "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)" > ); > ``` -> 2. HLL +> HLL > ```sql > CREATE TEMPORARY VIEW spark_doris > USING doris @@ -309,19 +332,27 @@ kafkaSource.selectExpr("CAST(value as STRING)") > ); > ``` -### Structured Streaming 专有配置 -| Key | Default Value | Comment | -| -------------------------------- | ------------- | ---------------------------------------------------------------- | -| doris.sink.streaming.passthrough | false | 将第一列的值不经过处理直接写入。 | +2. 从 1.3.0 版本开始, `doris.sink.max-retries` 配置项的默认值为 0,即默认不进行重试。 + 当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 -### RDD 专有配置 +3. 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下 +> DataFrame +> ```scala +> resultDf.format("doris") +> .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") +> // your own options +> .option("save_mode", SaveMode.Overwrite) +> .save() +> ``` +> +> SQL +> ```sql +> INSERT OVERWRITE your_target_table +> SELECT * FROM your_source_table +> ``` -| Key | Default Value | Comment | -|-----------------------------|---------------|----------------------------------------------| -| doris.request.auth.user | -- | 访问Doris的用户名 | -| doris.request.auth.password | -- | 访问Doris的密码 | -| doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 | +::: ## Doris 和 Spark 列类型映射关系