English | 简体中文
Spark OceanBase Connector can support reading data stored in OceanBase through Spark, and also supports writing data to OceanBase through Spark.
Read | Write | |
---|---|---|
DataFrame | JDBC | JDBC、Direct Load |
SQL | JDBC | JDBC、Direct Load |
Connector | Spark | OceanBase | Java | Scala |
---|---|---|---|---|
1.0 | 2.4, 3.1 ~ 3.4 |
|
8 | 2.12 |
- Note: If you need a package built based on other Scala versions, you can get the package by building it from source code.
You can get the release packages at Releases Page or Maven Central.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
<version>${project.version}</version>
</dependency>
If you'd rather use the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>spark-connector-oceanbase-3.4_2.12</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
Of course, you can also get the package by building from source code.
- By default, it is built with scala version 2.12
- After successful compilation, the target jar package will be generated in the target directory under the module corresponding to each version, such as: spark-connector-oceanbase-3.4_2.12-1.0-SNAPSHOT.jar. Copy this file to Spark's ClassPath to use spark-connector-oceanbase.
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -DskipTests
- If you need a package built based on other Scala versions, refer to the command below to build based on Scala 2.11.
git clone https://github.com/oceanbase/spark-connector-oceanbase.git
cd spark-connector-oceanbase
mvn clean package -Dscala.version=2.11.12 -Dscala.binary.version=2.11 -DskipTests
CREATE TEMPORARY VIEW spark_oceanbase
USING oceanbase
OPTIONS(
"url"= "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"schema-name"="test",
"table-name"="test",
"username"="root",
"password"="123456"
);
SELECT * FROM spark_oceanbase;
val oceanBaseSparkDF = spark.read.format("OceanBase")
.option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.option("username", "root")
.option("password", "123456")
.option("table-name", "test")
.option("schema-name", "test")
.load()
oceanBaseSparkDF.show(5)
Take synchronizing data from Hive to OceanBase as an example.
Create corresponding Hive tables and OceanBase tables to prepare for data synchronization
- Start spark-sql by running
${SPARK_HOME}/bin/spark-sql
CREATE TABLE test.orders (
order_id INT,
order_date TIMESTAMP,
customer_name string,
price double,
product_id INT,
order_status BOOLEAN
) using parquet;
insert into orders values
(1, now(), 'zs', 12.2, 12, true),
(2, now(), 'ls', 121.2, 12, true),
(3, now(), 'xx', 123.2, 12, true),
(4, now(), 'jac', 124.2, 12, false),
(5, now(), 'dot', 111.25, 12, true);
- Connect to OceanBase
CREATE TABLE test.orders (
order_id INT PRIMARY KEY,
order_date TIMESTAMP,
customer_name VARCHAR(225),
price double,
product_id INT,
order_status BOOLEAN
);
CREATE TEMPORARY VIEW test_jdbc
USING oceanbase
OPTIONS(
"url"= "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"schema-name"="test",
"table-name"="orders",
"username"="root@test",
"password"=""
);
insert into table test_jdbc
select * from test.orders;
insert overwrite table test_jdbc
select * from test.orders;
val df = spark.sql("select * from test.orders")
import org.apache.spark.sql.SaveMode
df.write
.format("oceanbase")
.mode(saveMode = SaveMode.Append)
.option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.option("username", "root")
.option("password", "123456")
.option("table-name", "orders")
.option("schema-name", "test")
.save()
CREATE TEMPORARY VIEW test_direct
USING oceanbase
OPTIONS(
"url"="jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"schema-name"="test",
"table-name"="orders",
"username"="root@test",
"password"="123456",
"direct-load.enabled" = true,
"direct-load.host" = "localhost",
"direct-load.rpc-port" = "2882"
);
insert into table test_direct
select * from test.orders;
insert overwrite table test_direct
select * from test.orders;
val df = spark.sql("select * from test.orders")
import org.apache.spark.sql.SaveMode
df.write
.format("oceanbase")
.mode(saveMode = SaveMode.Append)
.option("url", "jdbc:mysql://localhost:2881/test?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
.option("username", "root")
.option("password", "123456")
.option("table-name", "orders")
.option("schema-name", "test")
.option("direct-load.enabled", "true")
.option("direct-load.host", "localhost")
.option("direct-load.rpc-port", "2882")
.save()
Option | Default | Type | Description |
---|---|---|---|
url | String | The connection URL. | |
username | String | The connection username like 'root@sys'. | |
tenant-name | String | The tenant name. | |
password | String | The password. | |
schema-name | String | The schema name or database name. | |
table-name | String | The table name. |
Option | Default | Type | Description |
---|---|---|---|
direct-load.enabled | false | Boolean | Enable direct-load writing. |
direct-load.host | String | Hostname used in direct-load. | |
direct-load.rpc-port | 2882 | Integer | Rpc port number used in direct-load. |
direct-load.parallel | 8 | Integer | The parallel of the direct-load server. This parameter determines how much CPU resources the server uses to process this import task. |
direct-load.batch-size | 10240 | Integer | The size of the batch that is written to the OceanBase at one time. |
direct-load.max-error-rows | 0 | Long | Maximum tolerable number of error rows. |
direct-load.dup-action | REPLACE | String | Action when there is duplicated record of direct-load task. Can be STOP_ON_DUP , REPLACE or IGNORE . |
direct-load.timeout | 7d | Duration | The timeout for direct-load task. |
direct-load.heartbeat-timeout | 60s | Duration | Client heartbeat timeout in direct-load task. |
direct-load.heartbeat-interval | 10s | Duration | Client heartbeat interval in direct-load task. |
direct-load.load-method | full | String | The direct-load load mode: full , inc , inc_replace .
|
- This Connector is implemented based on JDBC To Other Databases.
- For more configuration items, see: JDBC To Other Databases#Data Source Option
- Support OceanBase MySQL and Oracle modes:
- For MySQL mode, you need to add the
MySQL Connector/J
driver to Spark's CLASSPATH - For Oracle mode, you need to add the
OceanBase Connector/J
driver to Spark's CLASSPATH
- For MySQL mode, you need to add the