diff --git a/docs/ecosystem/spark-doris-connector.md b/docs/ecosystem/spark-doris-connector.md index 40eab478792cb..8f7d108117692 100644 --- a/docs/ecosystem/spark-doris-connector.md +++ b/docs/ecosystem/spark-doris-connector.md @@ -39,7 +39,8 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 25.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 25.0.1 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 25.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 24.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | @@ -55,7 +56,7 @@ Github: https://github.com/apache/doris-spark-connector org.apache.doris spark-doris-connector-spark-3.5 - 25.0.0 + 25.0.1 ``` @@ -78,7 +79,7 @@ Starting from version 24.0.0, the naming rules of the Doris connector package ha When compiling, you can directly run `sh build.sh`, for details, please refer to here. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-25.0.1.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. You can also Execute in the source code directory: @@ -87,21 +88,21 @@ Execute in the source code directory: Enter the Scala and Spark versions you need to compile according to the prompts. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-25.0.1.jar`. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -For example, upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +For example, upload `spark-doris-connector-spark-3.5-25.0.1.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter ```shell -1. Upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs. +1. Upload `spark-doris-connector-spark-3.5-25.0.1.jar` to hdfs. hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.0.1.jar /spark-jars/ -2. Add the `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` dependency in the cluster. -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar +2. Add the `spark-doris-connector-spark-3.5-25.0.1.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.0.1.jar ``` @@ -209,7 +210,7 @@ mockDataDF.write.format("doris") //specify the fields to write .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") // Support setting Overwrite mode to overwrite data - // .option("save_mode", SaveMode.Overwrite) + // .mode(SaveMode.Overwrite) .save() ``` @@ -469,30 +470,34 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ## Doris & Spark Column Type Mapping -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType1 | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| STRING | DataTypes.StringType | -| JSON | DataTypes.StringType | -| VARIANT | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* Note: In Connector, ` DATETIME` is mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text. +| Doris Type | Spark Type | +|------------|-------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.TimestampType | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| STRING | DataTypes.StringType | +| JSON | DataTypes.StringType | +| VARIANT | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | DataTypes.StringType | +| Bitmap | DataTypes.StringType | + +:::tip + +Since version 24.0.0, the return type of the Bitmap type is string type, and the default return value is string value `Read unsupported`. + +::: ## FAQ @@ -542,7 +547,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ resultDf.format("doris") .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") // your own options - .option("save_mode", SaveMode.Overwrite) + .mode(SaveMode.Overwrite) .save() ``` @@ -579,4 +584,15 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ .option("password", "$YOUR_DORIS_PASSWORD") .option("doris.read.bitmap-to-base64","true") .load() + ``` + +4. **An error occurs when writing in DataFrame mode: `org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.`** + + Need to add save mode to append. + ```scala + resultDf.format("doris") + .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") + // your own options + .mode(SaveMode.Append) + .save() ``` \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md index 0840497267277..e2f0e3d0b6e72 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md @@ -30,7 +30,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 代码库地址:https://github.com/apache/doris-spark-connector -- 支持从 `Doris` 中通过 `RDD`、`DataFrame` 以及 `Spark SQL` 方式批量读取数据,推荐使用 `DataFrame` 或 `Spark SQL`。 +- 支持从 `Doris` 中通过 `RDD`、`DataFrame` 以及 `Spark SQL` 方式批量读取数据, 推荐使用 `DataFrame` 或 `Spark SQL`。 - 支持使用 `DataFrame` 和 `Spark SQL` 批量或流式地将数据写入 `Doris`。 - 支持在 `Doris` 端完成数据过滤,减少数据传输量。 @@ -38,8 +38,8 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| +| 25.0.1 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 25.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | -| 24.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | @@ -54,11 +54,11 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 org.apache.doris spark-doris-connector-spark-3.5 - 25.0.0 + 25.0.1 ``` -::: tip +:::tip 从 24.0.0 版本开始,Doris connector 包命名规则发生调整: 1. 不再包含 Scala 版本信息 @@ -77,7 +77,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 编译时,可直接运行 `sh build.sh`,具体可参考这里。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar。将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-25.0.1.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 也可以 @@ -85,20 +85,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 `sh build.sh` 根据提示输入你需要的 Scala 与 Spark 版本进行编译。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-25.0.1.jar`。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar 包路径 +例如将 `spark-doris-connector-spark-3.5-25.0.1.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 ```shell -1. 上传 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 到 hdfs。 +1. 上传 `spark-doris-connector-spark-3.5-25.0.1.jar` 到 hdfs。 hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.0.1.jar /spark-jars/ -2. 在集群中添加 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 依赖。 -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar +2. 在集群中添加 `spark-doris-connector-spark-3.5-25.0.1.jar` 依赖。 +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.0.1.jar ``` @@ -168,7 +168,7 @@ dorisSparkDF.show(5) 从 24.0.0 版本开始,支持通过 Arrow Flight SQL 方式读取数据(需要 Doris 版本 >= 2.1.0)。 -设置 `doris.read.mode` 为 arrow,设置 `doris.read.arrow-flight-sql.port` 为 FE 配置的 Arrow Flight SQL 端口,服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect)。 +设置 `doris.read.mode` 为 arrow, 设置 `doris.read.arrow-flight-sql.port` 为 FE 配置的 Arrow Flight SQL 端口,服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect)。 ```scala val df = spark.read.format("doris") @@ -204,7 +204,7 @@ mockDataDF.write.format("doris") //指定要写入的列 .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") // 从 1.3.0 版本开始,支持覆盖写入 - // .option("save_mode", SaveMode.Overwrite) + // .mode(SaveMode.Overwrite) .save() ``` @@ -312,15 +312,15 @@ df.write.format("doris") ### Spark Doris Catalog -从 24.0.0 版本开始,支持通过 Spark Catalog 方式访问 Doris。 +从 24.0.0 版本开始, 支持通过 Spark Catalog 方式访问 Doris。 #### Catalog Config | 选项名称 | 是否必须 | 注释 | |------------------------------------------------------|------|--------------------------------------------------------------------------------------------------| -| spark.sql.catalog.your_catalog_name | 是 | 设置 Catalog 提供者的类名,对于 Doris 来说唯一的有效值为 `org.apache.doris.spark.catalog.DorisTableCatalog`。 | +| spark.sql.catalog.your_catalog_name | 是 | 设置 Catalog 提供者的类名, 对于 Doris 来说唯一的有效值为 `org.apache.doris.spark.catalog.DorisTableCatalog`。 | | spark.sql.catalog.your_catalog_name.doris.fenodes | 是 | 设置 Doris FE 节点,格式为 fe_ip:fe_http_port。 | -| spark.sql.catalog.your_catalog_name.doris.query.port | 否 | 设置 Doris FE 查询端口,当 `spark.sql.catalog.your_catalog_name.doris.fe.auto.fetch` 为 true 时,此选项可以不设置。 | +| spark.sql.catalog.your_catalog_name.doris.query.port | 否 | 设置 Doris FE 查询端口, 当 `spark.sql.catalog.your_catalog_name.doris.fe.auto.fetch` 为 true 时,此选项可以不设置。 | | spark.sql.catalog.your_catalog_name.doris.user | 是 | 设置 Doris 用户。 | | spark.sql.catalog.your_catalog_name.doris.password | 是 | 设置 Doris 密码。 | | spark.sql.defaultCatalog | 否 | 设置 Spark SQL 默认 catalog。 | @@ -374,7 +374,7 @@ spark-sql \ --conf "spark.sql.catalog.your_catalog_name.doris.password=" \ --conf "spark.sql.defaultCatalog=your_catalog_name" ``` -在 Spark SQL CLI 中执行查询。 +在 Spark SQL CLI 中执行查询. ```sparksql -- show all databases show databases; @@ -425,7 +425,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ | doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | | doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始,默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | | doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | @@ -470,31 +470,34 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ## Doris 和 Spark 列类型映射关系 -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType1 | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| STRING | DataTypes.StringType | -| JSON | DataTypes.StringType | -| VARIANT | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。 +| Doris Type | Spark Type | +|------------|-------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.TimestampType | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| STRING | DataTypes.StringType | +| JSON | DataTypes.StringType | +| VARIANT | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | DataTypes.StringType | +| Bitmap | DataTypes.StringType | + +:::tip +从 24.0.0 版本开始,Bitmap 类型读取返回类型为字符串,默认返回字符串值 Read unsupported。 + +::: ## 常见问题 1. **如何写入 Bitmap 类型?** @@ -534,7 +537,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ::: -2. **如何使用 overwrite 写入?** +2. **如何使用overwrite写入?** 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下 **DataFrame** @@ -542,7 +545,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ resultDf.format("doris") .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") // your own options - .option("save_mode", SaveMode.Overwrite) + .mode(SaveMode.Overwrite) .save() ``` **SQL** @@ -579,3 +582,15 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ .option("doris.read.bitmap-to-base64","true") .load() ``` + +4. **DataFrame 方式写入时报错:`org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.`** + + 需要添加 save mode 为 append。 + ```scala + resultDf.format("doris") + .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") + // your own options + .mode(SaveMode.Append) + .save() + ``` + \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md index b750cf814750f..5aff14b621e1e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md @@ -30,7 +30,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 代码库地址:https://github.com/apache/doris-spark-connector -- 支持从 `Doris` 中通过 `RDD`、`DataFrame` 以及 `Spark SQL` 方式批量读取数据,推荐使用 `DataFrame` 或 `Spark SQL`。 +- 支持从 `Doris` 中通过 `RDD`、`DataFrame` 以及 `Spark SQL` 方式批量读取数据, 推荐使用 `DataFrame` 或 `Spark SQL`。 - 支持使用 `DataFrame` 和 `Spark SQL` 批量或流式地将数据写入 `Doris`。 - 支持在 `Doris` 端完成数据过滤,减少数据传输量。 @@ -38,8 +38,8 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| +| 25.0.1 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 25.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | -| 24.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | @@ -54,7 +54,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 org.apache.doris spark-doris-connector-spark-3.5 - 24.0.0 + 25.0.1 ``` @@ -77,7 +77,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 编译时,可直接运行 `sh build.sh`,具体可参考这里。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar。将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-25.0.1.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 也可以 @@ -85,20 +85,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 `sh build.sh` 根据提示输入你需要的 Scala 与 Spark 版本进行编译。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-25.0.1.jar`。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar 包路径 +例如将 `spark-doris-connector-spark-3.5-25.0.1.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 ```shell -1. 上传 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 到 hdfs。 +1. 上传 `spark-doris-connector-spark-3.5-25.0.1.jar` 到 hdfs。 hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.0.1.jar /spark-jars/ -2. 在集群中添加 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 依赖。 -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar +2. 在集群中添加 `spark-doris-connector-spark-3.5-25.0.1.jar` 依赖。 +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.0.1.jar ``` @@ -168,7 +168,7 @@ dorisSparkDF.show(5) 从 24.0.0 版本开始,支持通过 Arrow Flight SQL 方式读取数据(需要 Doris 版本 >= 2.1.0)。 -设置 `doris.read.mode` 为 arrow,设置 `doris.read.arrow-flight-sql.port` 为 FE 配置的 Arrow Flight SQL 端口,服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect)。 +设置 `doris.read.mode` 为 arrow, 设置 `doris.read.arrow-flight-sql.port` 为 FE 配置的 Arrow Flight SQL 端口,服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect)。 ```scala val df = spark.read.format("doris") @@ -204,7 +204,7 @@ mockDataDF.write.format("doris") //指定要写入的列 .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") // 从 1.3.0 版本开始,支持覆盖写入 - // .option("save_mode", SaveMode.Overwrite) + // .mode(SaveMode.Overwrite) .save() ``` @@ -312,15 +312,15 @@ df.write.format("doris") ### Spark Doris Catalog -从 24.0.0 版本开始,支持通过 Spark Catalog 方式访问 Doris。 +从 24.0.0 版本开始, 支持通过 Spark Catalog 方式访问 Doris。 #### Catalog Config | 选项名称 | 是否必须 | 注释 | |------------------------------------------------------|------|--------------------------------------------------------------------------------------------------| -| spark.sql.catalog.your_catalog_name | 是 | 设置 Catalog 提供者的类名,对于 Doris 来说唯一的有效值为 `org.apache.doris.spark.catalog.DorisTableCatalog`。 | +| spark.sql.catalog.your_catalog_name | 是 | 设置 Catalog 提供者的类名, 对于 Doris 来说唯一的有效值为 `org.apache.doris.spark.catalog.DorisTableCatalog`。 | | spark.sql.catalog.your_catalog_name.doris.fenodes | 是 | 设置 Doris FE 节点,格式为 fe_ip:fe_http_port。 | -| spark.sql.catalog.your_catalog_name.doris.query.port | 否 | 设置 Doris FE 查询端口,当 `spark.sql.catalog.your_catalog_name.doris.fe.auto.fetch` 为 true 时,此选项可以不设置。 | +| spark.sql.catalog.your_catalog_name.doris.query.port | 否 | 设置 Doris FE 查询端口, 当 `spark.sql.catalog.your_catalog_name.doris.fe.auto.fetch` 为 true 时,此选项可以不设置。 | | spark.sql.catalog.your_catalog_name.doris.user | 是 | 设置 Doris 用户。 | | spark.sql.catalog.your_catalog_name.doris.password | 是 | 设置 Doris 密码。 | | spark.sql.defaultCatalog | 否 | 设置 Spark SQL 默认 catalog。 | @@ -374,7 +374,7 @@ spark-sql \ --conf "spark.sql.catalog.your_catalog_name.doris.password=" \ --conf "spark.sql.defaultCatalog=your_catalog_name" ``` -在 Spark SQL CLI 中执行查询。 +在 Spark SQL CLI 中执行查询. ```sparksql -- show all databases show databases; @@ -401,7 +401,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ### Java 示例 `samples/doris-demo/spark-demo/` 下提供了 Java -版本的示例,可供参考,[这里](https://github.com/apache/doris/tree/master/samples/doris-demo/spark-demo) +版本的示例,可供参考,[这里](https://github.com/apache/incubator-doris/tree/master/samples/doris-demo/spark-demo) ## 配置 @@ -425,7 +425,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ | doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | | doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始,默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | | doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | @@ -470,31 +470,34 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ## Doris 和 Spark 列类型映射关系 -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType1 | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| STRING | DataTypes.StringType | -| JSON | DataTypes.StringType | -| VARIANT | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。 +| Doris Type | Spark Type | +|------------|-------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.TimestampType | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| STRING | DataTypes.StringType | +| JSON | DataTypes.StringType | +| VARIANT | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | DataTypes.StringType | +| Bitmap | DataTypes.StringType | +:::tip + +从 24.0.0 版本开始,Bitmap 类型读取返回类型为字符串,默认返回字符串值 Read unsupported。 + +::: ## 常见问题 1. **如何写入 Bitmap 类型?** @@ -534,7 +537,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ::: -2. **如何使用 overwrite 写入?** +2. **如何使用overwrite写入?** 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下 **DataFrame** @@ -542,7 +545,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ resultDf.format("doris") .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") // your own options - .option("save_mode", SaveMode.Overwrite) + .mode(SaveMode.Overwrite) .save() ``` **SQL** @@ -579,3 +582,15 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ .option("doris.read.bitmap-to-base64","true") .load() ``` + +4. **DataFrame 方式写入时报错:`org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.`** + + 需要添加 save mode 为 append。 + ```scala + resultDf.format("doris") + .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") + // your own options + .mode(SaveMode.Append) + .save() + ``` + \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md index b750cf814750f..5aff14b621e1e 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md @@ -30,7 +30,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 代码库地址:https://github.com/apache/doris-spark-connector -- 支持从 `Doris` 中通过 `RDD`、`DataFrame` 以及 `Spark SQL` 方式批量读取数据,推荐使用 `DataFrame` 或 `Spark SQL`。 +- 支持从 `Doris` 中通过 `RDD`、`DataFrame` 以及 `Spark SQL` 方式批量读取数据, 推荐使用 `DataFrame` 或 `Spark SQL`。 - 支持使用 `DataFrame` 和 `Spark SQL` 批量或流式地将数据写入 `Doris`。 - 支持在 `Doris` 端完成数据过滤,减少数据传输量。 @@ -38,8 +38,8 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| +| 25.0.1 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 25.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | -| 24.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | @@ -54,7 +54,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 org.apache.doris spark-doris-connector-spark-3.5 - 24.0.0 + 25.0.1 ``` @@ -77,7 +77,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 编译时,可直接运行 `sh build.sh`,具体可参考这里。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar。将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-25.0.1.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 也可以 @@ -85,20 +85,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 `sh build.sh` 根据提示输入你需要的 Scala 与 Spark 版本进行编译。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-25.0.1.jar`。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar 包路径 +例如将 `spark-doris-connector-spark-3.5-25.0.1.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 ```shell -1. 上传 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 到 hdfs。 +1. 上传 `spark-doris-connector-spark-3.5-25.0.1.jar` 到 hdfs。 hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.0.1.jar /spark-jars/ -2. 在集群中添加 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 依赖。 -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar +2. 在集群中添加 `spark-doris-connector-spark-3.5-25.0.1.jar` 依赖。 +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.0.1.jar ``` @@ -168,7 +168,7 @@ dorisSparkDF.show(5) 从 24.0.0 版本开始,支持通过 Arrow Flight SQL 方式读取数据(需要 Doris 版本 >= 2.1.0)。 -设置 `doris.read.mode` 为 arrow,设置 `doris.read.arrow-flight-sql.port` 为 FE 配置的 Arrow Flight SQL 端口,服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect)。 +设置 `doris.read.mode` 为 arrow, 设置 `doris.read.arrow-flight-sql.port` 为 FE 配置的 Arrow Flight SQL 端口,服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect)。 ```scala val df = spark.read.format("doris") @@ -204,7 +204,7 @@ mockDataDF.write.format("doris") //指定要写入的列 .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") // 从 1.3.0 版本开始,支持覆盖写入 - // .option("save_mode", SaveMode.Overwrite) + // .mode(SaveMode.Overwrite) .save() ``` @@ -312,15 +312,15 @@ df.write.format("doris") ### Spark Doris Catalog -从 24.0.0 版本开始,支持通过 Spark Catalog 方式访问 Doris。 +从 24.0.0 版本开始, 支持通过 Spark Catalog 方式访问 Doris。 #### Catalog Config | 选项名称 | 是否必须 | 注释 | |------------------------------------------------------|------|--------------------------------------------------------------------------------------------------| -| spark.sql.catalog.your_catalog_name | 是 | 设置 Catalog 提供者的类名,对于 Doris 来说唯一的有效值为 `org.apache.doris.spark.catalog.DorisTableCatalog`。 | +| spark.sql.catalog.your_catalog_name | 是 | 设置 Catalog 提供者的类名, 对于 Doris 来说唯一的有效值为 `org.apache.doris.spark.catalog.DorisTableCatalog`。 | | spark.sql.catalog.your_catalog_name.doris.fenodes | 是 | 设置 Doris FE 节点,格式为 fe_ip:fe_http_port。 | -| spark.sql.catalog.your_catalog_name.doris.query.port | 否 | 设置 Doris FE 查询端口,当 `spark.sql.catalog.your_catalog_name.doris.fe.auto.fetch` 为 true 时,此选项可以不设置。 | +| spark.sql.catalog.your_catalog_name.doris.query.port | 否 | 设置 Doris FE 查询端口, 当 `spark.sql.catalog.your_catalog_name.doris.fe.auto.fetch` 为 true 时,此选项可以不设置。 | | spark.sql.catalog.your_catalog_name.doris.user | 是 | 设置 Doris 用户。 | | spark.sql.catalog.your_catalog_name.doris.password | 是 | 设置 Doris 密码。 | | spark.sql.defaultCatalog | 否 | 设置 Spark SQL 默认 catalog。 | @@ -374,7 +374,7 @@ spark-sql \ --conf "spark.sql.catalog.your_catalog_name.doris.password=" \ --conf "spark.sql.defaultCatalog=your_catalog_name" ``` -在 Spark SQL CLI 中执行查询。 +在 Spark SQL CLI 中执行查询. ```sparksql -- show all databases show databases; @@ -401,7 +401,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ### Java 示例 `samples/doris-demo/spark-demo/` 下提供了 Java -版本的示例,可供参考,[这里](https://github.com/apache/doris/tree/master/samples/doris-demo/spark-demo) +版本的示例,可供参考,[这里](https://github.com/apache/incubator-doris/tree/master/samples/doris-demo/spark-demo) ## 配置 @@ -425,7 +425,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ | doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | | doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | | doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始,默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | | doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | | doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | @@ -470,31 +470,34 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ## Doris 和 Spark 列类型映射关系 -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType1 | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| STRING | DataTypes.StringType | -| JSON | DataTypes.StringType | -| VARIANT | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。 +| Doris Type | Spark Type | +|------------|-------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.TimestampType | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| STRING | DataTypes.StringType | +| JSON | DataTypes.StringType | +| VARIANT | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | DataTypes.StringType | +| Bitmap | DataTypes.StringType | +:::tip + +从 24.0.0 版本开始,Bitmap 类型读取返回类型为字符串,默认返回字符串值 Read unsupported。 + +::: ## 常见问题 1. **如何写入 Bitmap 类型?** @@ -534,7 +537,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ::: -2. **如何使用 overwrite 写入?** +2. **如何使用overwrite写入?** 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下 **DataFrame** @@ -542,7 +545,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ resultDf.format("doris") .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") // your own options - .option("save_mode", SaveMode.Overwrite) + .mode(SaveMode.Overwrite) .save() ``` **SQL** @@ -579,3 +582,15 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ .option("doris.read.bitmap-to-base64","true") .load() ``` + +4. **DataFrame 方式写入时报错:`org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.`** + + 需要添加 save mode 为 append。 + ```scala + resultDf.format("doris") + .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") + // your own options + .mode(SaveMode.Append) + .save() + ``` + \ No newline at end of file diff --git a/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md b/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md index 23388573eb455..1e21bffde8ed6 100644 --- a/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md +++ b/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md @@ -39,6 +39,7 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| +| 25.0.1 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 25.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 24.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.6 | 8 | 2.12, 2.11 | @@ -55,7 +56,7 @@ Github: https://github.com/apache/doris-spark-connector org.apache.doris spark-doris-connector-spark-3.5 - 24.0.0 + 25.0.1 ``` @@ -78,7 +79,7 @@ Starting from version 24.0.0, the naming rules of the Doris connector package ha When compiling, you can directly run `sh build.sh`, for details, please refer to here. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-25.0.1.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. You can also Execute in the source code directory: @@ -87,21 +88,21 @@ Execute in the source code directory: Enter the Scala and Spark versions you need to compile according to the prompts. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-25.0.1.jar`. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -For example, upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +For example, upload `spark-doris-connector-spark-3.5-25.0.1.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter ```shell -1. Upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs. +1. Upload `spark-doris-connector-spark-3.5-25.0.1.jar` to hdfs. hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.0.1.jar /spark-jars/ -2. Add the `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` dependency in the cluster. -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar +2. Add the `spark-doris-connector-spark-3.5-25.0.1.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.0.1.jar ``` @@ -209,7 +210,7 @@ mockDataDF.write.format("doris") //specify the fields to write .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") // Support setting Overwrite mode to overwrite data - // .option("save_mode", SaveMode.Overwrite) + // .mode(SaveMode.Overwrite) .save() ``` @@ -469,30 +470,34 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ## Doris & Spark Column Type Mapping -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType1 | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| STRING | DataTypes.StringType | -| JSON | DataTypes.StringType | -| VARIANT | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* Note: In Connector, ` DATETIME` is mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text. +| Doris Type | Spark Type | +|------------|-------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.TimestampType | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| STRING | DataTypes.StringType | +| JSON | DataTypes.StringType | +| VARIANT | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | DataTypes.StringType | +| Bitmap | DataTypes.StringType | + +:::tip + +Since version 24.0.0, the return type of the Bitmap type is string type, and the default return value is string value `Read unsupported`. + +::: ## FAQ @@ -542,7 +547,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ resultDf.format("doris") .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") // your own options - .option("save_mode", SaveMode.Overwrite) + .mode(SaveMode.Overwrite) .save() ``` @@ -579,4 +584,15 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ .option("password", "$YOUR_DORIS_PASSWORD") .option("doris.read.bitmap-to-base64","true") .load() + ``` + +4. **An error occurs when writing in DataFrame mode: `org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.`** + + Need to add save mode to append. + ```scala + resultDf.format("doris") + .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") + // your own options + .mode(SaveMode.Append) + .save() ``` \ No newline at end of file diff --git a/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md b/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md index 23388573eb455..1e21bffde8ed6 100644 --- a/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md +++ b/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md @@ -39,6 +39,7 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| +| 25.0.1 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 25.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 24.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.6 | 8 | 2.12, 2.11 | @@ -55,7 +56,7 @@ Github: https://github.com/apache/doris-spark-connector org.apache.doris spark-doris-connector-spark-3.5 - 24.0.0 + 25.0.1 ``` @@ -78,7 +79,7 @@ Starting from version 24.0.0, the naming rules of the Doris connector package ha When compiling, you can directly run `sh build.sh`, for details, please refer to here. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-25.0.1.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. You can also Execute in the source code directory: @@ -87,21 +88,21 @@ Execute in the source code directory: Enter the Scala and Spark versions you need to compile according to the prompts. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-25.0.1.jar`. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -For example, upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +For example, upload `spark-doris-connector-spark-3.5-25.0.1.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter ```shell -1. Upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs. +1. Upload `spark-doris-connector-spark-3.5-25.0.1.jar` to hdfs. hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-25.0.1.jar /spark-jars/ -2. Add the `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` dependency in the cluster. -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar +2. Add the `spark-doris-connector-spark-3.5-25.0.1.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-25.0.1.jar ``` @@ -209,7 +210,7 @@ mockDataDF.write.format("doris") //specify the fields to write .option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE") // Support setting Overwrite mode to overwrite data - // .option("save_mode", SaveMode.Overwrite) + // .mode(SaveMode.Overwrite) .save() ``` @@ -469,30 +470,34 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ## Doris & Spark Column Type Mapping -| Doris Type | Spark Type | -|------------|----------------------------------| -| NULL_TYPE | DataTypes.NullType | -| BOOLEAN | DataTypes.BooleanType | -| TINYINT | DataTypes.ByteType | -| SMALLINT | DataTypes.ShortType | -| INT | DataTypes.IntegerType | -| BIGINT | DataTypes.LongType | -| FLOAT | DataTypes.FloatType | -| DOUBLE | DataTypes.DoubleType | -| DATE | DataTypes.DateType | -| DATETIME | DataTypes.StringType1 | -| DECIMAL | DecimalType | -| CHAR | DataTypes.StringType | -| LARGEINT | DecimalType | -| VARCHAR | DataTypes.StringType | -| STRING | DataTypes.StringType | -| JSON | DataTypes.StringType | -| VARIANT | DataTypes.StringType | -| TIME | DataTypes.DoubleType | -| HLL | Unsupported datatype | -| Bitmap | Unsupported datatype | - -* Note: In Connector, ` DATETIME` is mapped to `String`. Due to the processing logic of the Doris underlying storage engine, when the time type is used directly, the time range covered cannot meet the demand. So use `String` type to directly return the corresponding time readable text. +| Doris Type | Spark Type | +|------------|-------------------------| +| NULL_TYPE | DataTypes.NullType | +| BOOLEAN | DataTypes.BooleanType | +| TINYINT | DataTypes.ByteType | +| SMALLINT | DataTypes.ShortType | +| INT | DataTypes.IntegerType | +| BIGINT | DataTypes.LongType | +| FLOAT | DataTypes.FloatType | +| DOUBLE | DataTypes.DoubleType | +| DATE | DataTypes.DateType | +| DATETIME | DataTypes.TimestampType | +| DECIMAL | DecimalType | +| CHAR | DataTypes.StringType | +| LARGEINT | DecimalType | +| VARCHAR | DataTypes.StringType | +| STRING | DataTypes.StringType | +| JSON | DataTypes.StringType | +| VARIANT | DataTypes.StringType | +| TIME | DataTypes.DoubleType | +| HLL | DataTypes.StringType | +| Bitmap | DataTypes.StringType | + +:::tip + +Since version 24.0.0, the return type of the Bitmap type is string type, and the default return value is string value `Read unsupported`. + +::: ## FAQ @@ -542,7 +547,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ resultDf.format("doris") .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") // your own options - .option("save_mode", SaveMode.Overwrite) + .mode(SaveMode.Overwrite) .save() ``` @@ -579,4 +584,15 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ .option("password", "$YOUR_DORIS_PASSWORD") .option("doris.read.bitmap-to-base64","true") .load() + ``` + +4. **An error occurs when writing in DataFrame mode: `org.apache.spark.sql.AnalysisException: TableProvider implementation doris cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.`** + + Need to add save mode to append. + ```scala + resultDf.format("doris") + .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT") + // your own options + .mode(SaveMode.Append) + .save() ``` \ No newline at end of file