Skip to content

Commit

Permalink
update spark connector doc
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil committed Dec 4, 2023
1 parent 1afdbfe commit 8258911
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 52 deletions.
84 changes: 57 additions & 27 deletions docs/en/docs/ecosystem/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ Github: https://github.com/apache/doris-spark-connector

## Version Compatibility

| Connector | Spark | Doris | Java | Scala |
|-----------|---------------|-------------|------|------------|
| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |
| Connector | Spark | Doris | Java | Scala |
|-----------|---------------------|-------------|------|------------|
| 1.3.0 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.2.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |

## Build and Install

Expand Down Expand Up @@ -80,8 +81,8 @@ spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.1_2.12-1.2.0-SNAPSHOT
```
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.1_2.12</artifactId>
<version>1.2.0</version>
<artifactId>spark-doris-connector-3.4_2.12</artifactId>
<version>1.3.0</version>
</dependency>
```

Expand Down Expand Up @@ -170,11 +171,14 @@ OPTIONS(

INSERT INTO spark_doris
VALUES ("VALUE1", "VALUE2", ...);
#
or
# or
INSERT INTO spark_doris
SELECT *
FROM YOUR_TABLE
# or
INSERT OVERWRITE
SELECT *
FROM YOUR_TABLE
```

#### DataFrame(batch/stream)
Expand All @@ -196,6 +200,8 @@ mockDataDF.write.format("doris")
//other options
//specify the fields to write
.option("doris.write.fields", "$YOUR_FIELDS_TO_WRITE")
// Support setting Overwrite mode to overwrite data
// .option("save_mode", SaveMode.Overwrite)
.save()

## stream sink(StructuredStreaming)
Expand Down Expand Up @@ -260,13 +266,14 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
| doris.deserialize.arrow.async | false | Whether to support asynchronous conversion of Arrow format to RowBatch required for spark-doris-connector iteration |
| doris.deserialize.queue.size | 64 | Asynchronous conversion of the internal processing queue in Arrow format takes effect when doris.deserialize.arrow.async is true |
| doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.<br/>By default, all fields are written in the order of Doris table fields. |
| sink.batch.size | 10000 | Maximum number of lines in a single write BE |
| sink.max-retries | 1 | Number of retries after writing BE failed |
| sink.properties.* | -- | Import parameters for Stream Load. <br/>For example:<br/>Specify column separator: `'sink.properties.column_separator' = ','`, specify write data format: `'sink.properties.format' = 'json'` [More Multi-parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual#%E5%88%9B%E5%BB% BA%E5%AF%BC%E5%85%A5) |
| doris.sink.batch.size | 100000 | Maximum number of lines in a single write BE |
| doris.sink.max-retries | 0 | Number of retries after writing BE failed |
| sink.properties.* | -- | Import parameters for Stream Load. <br/>For example:<br/>Specify column separator: `'doris.sink.properties.column_separator' = ','`, specify write data format: `'doris.sink.properties.format' = 'json'` [More parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.task.partition.size | -- | The number of partitions corresponding to the Writing task. After filtering and other operations, the number of partitions written in Spark RDD may be large, but the number of records corresponding to each Partition is relatively small, resulting in increased writing frequency and waste of computing resources. The smaller this value is set, the less Doris write frequency and less Doris merge pressure. It is generally used with doris.sink.task.use.repartition. |
| doris.sink.task.use.repartition | false | Whether to use repartition mode to control the number of partitions written by Doris. The default value is false, and coalesce is used (note: if there is no Spark action before the write, the whole computation will be less parallel). If it is set to true, then repartition is used (note: you can set the final number of partitions at the cost of shuffle). |
| doris.sink.batch.interval.ms | 50 | The interval time of each batch sink, unit ms. |
| doris.sink.enable-2pc | false | Whether to enable two-stage commit. When enabled, transactions will be committed at the end of the job, and all pre-commit transactions will be rolled back when some tasks fail. |
| doris.sink.auto-redirect | false | Whether to redirect StreamLoad requests. After being turned on, StreamLoad will write through FE and no longer obtain BE information explicitly. |

### SQL & Dataframe Configuration

Expand All @@ -277,8 +284,24 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
| doris.filter.query.in.max.count | 100 | In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side. |
| doris.ignore-type | -- | In a temporary view, specify the field types to ignore when reading the schema. <br/> eg: when 'doris.ignore-type'='bitmap,hll' |

* Note: In Spark SQL, when writing data through insert into, if the target table of doris contains `BITMAP` or `HLL` type data, you need to set the parameter `doris.ignore-type` to the corresponding type, and set `doris.write.fields` maps the corresponding columns, the usage is as follows:
> 1. BITMAP
### Structured Streaming Configuration

| Key | Default Value | Comment |
| -------------------------------- | ------------- | ---------------------------------------------------------------- |
| doris.sink.streaming.passthrough | false | Write the value of the first column directly without processing. |

### RDD Configuration

| Key | Default Value | Comment |
|-----------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| doris.request.auth.user | -- | Doris username |
| doris.request.auth.password | -- | Doris password |
| doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. |

:::tip

1. In Spark SQL, when writing data through insert into, if the target table of doris contains `BITMAP` or `HLL` type data, you need to set the parameter `doris.ignore-type` to the corresponding type, and set `doris.write.fields` maps the corresponding columns, the usage is as follows:
> BITMAP
> ```sql
> CREATE TEMPORARY VIEW spark_doris
> USING doris
Expand All @@ -291,7 +314,7 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
> "doris.write.fields"="col1,col2,col3,bitmap_col2=to_bitmap(col2),bitmap_col3=bitmap_hash(col3)"
> );
> ```
> 2. HLL
> HLL
> ```sql
> CREATE TEMPORARY VIEW spark_doris
> USING doris
Expand All @@ -305,19 +328,26 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
> );
> ```
### Structured Streaming Configuration
| Key | Default Value | Comment |
| -------------------------------- | ------------- | ---------------------------------------------------------------- |
| doris.sink.streaming.passthrough | false | Write the value of the first column directly without processing. |
### RDD Configuration
2. Since version 1.3.0, the default value of `doris.sink.max-retries` configuration is 0, which means no retries are performed by default.
When this parameter is set greater than 0, batch-level failure retries will be performed, and data of the configured size of `doris.sink.batch.size` will be cached in the Spark Executor memory. The memory allocation may need to be appropriately increased.
3. Since version 1.3.0, overwrite mode insertion is supported (only full table-level overwrite insertion is supported). The specific usage is as follows
> DataFrame
> ```scala
> resultDf.format("doris")
> .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
> // your own options
> .option("save_mode", SaveMode.Overwrite)
> .save()
> ```
>
> SQL
> ```sql
> INSERT OVERWRITE your_target_table
> SELECT * FROM your_source_table
> ```
| Key | Default Value | Comment |
|-----------------------------|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| doris.request.auth.user | -- | Doris username |
| doris.request.auth.password | -- | Doris password |
| doris.filter.query | -- | Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering. |
:::
## Doris & Spark Column Type Mapping
Expand Down
Loading

0 comments on commit 8258911

Please sign in to comment.