Kafka Connect SAP is a generic set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
To install the connector from source,
Clone this repository to your local desktop, and then bring up a command prompt in the directory.
and use the following command.
mvn clean install -DskipTests
which should produce the Kafka Connector jar file kafka-connector-hana_m-n.jar
in the modules/scala_m/target
folder, where m
corresponds to Scala binary version and n
corresponds to the connector version.
Include the Jdbc Jar
Please refer to SAP Developer License Agreement for the use of the driver jar.
- Follow the steps in SAP HANA Client Interface Programming Reference guide to access the SAP HANA Jdbc jar.
- The maven coordinate of the driver is
com.sap.cloud.db.jdbc:ngdbc:x.x.x
and the drivers are available at the central maven repository https://search.maven.org/artifact/com.sap.cloud.db.jdbc/ngdbc.
This instruction assumes Kafka installation is locally available so that you can start Kafka Connect as a standalone instance using its bin/connect-standalone
command.
For getting started with this connector, the following steps need to be completed.
-
Assume there is a table in Hana suitable for this sample. In the following, it is assumed that there is a table named
PERSONS1
with the following SQL schema(PersonID int primary key, LastName varchar(255), FirstName varchar(255))
. -
Create the config file for source named
connect-hana-source-1.properties
and placed it in folderconfig
.
name=test-topic-1-source
connector.class=com.sap.kafka.connect.source.hana.HANASourceConnector
tasks.max=1
topics=test_topic_1
connection.url=jdbc:sap://<url>/
connection.user=<username>
connection.password=<password>
test_topic_1.table.name=<schemaname>."PERSONS1"
The above configuration says this source connector should read records from Hana table PERSONS1
and send them to Kafka topic test_topic_1
.
- Create the config file for sink named
connect-hana-sink-1.properties
and place it in folderconfig
.
name=test_topic_1_sink
connector.class=com.sap.kafka.connect.sink.hana.HANASinkConnector
tasks.max=1
topics=test_topic_1
connection.url=jdbc:sap://<url>/
connection.user=<username>
connection.password=<password>
auto.create=true
test_topic_1.table.name=<schemaname>."PERSONS1_RES"
The above configuration says this sink connector should read messages from Kafka topic test_topic_1
and insert to Hana table PERSONS1_RES
.
- Start the above kafka-connect source and sink connectors using the standalone connector properties
connect-standalone.properties
with the following command.
./bin/connect-standalone config/connect-standalone.properties config/connect-hana-test-source-1.properties config/connect-hana-test-sink-1.properties
The above scenario is the simplest scenario of transferring records between Hana and Kafka. For the detail of this scenario and other more complex scenarios, refer to Examples.
The demo examples included in Examples use Kafka Connect running in different environments such as standalone and distributed modes. For general information on how to run Kafka Connect, refer to Kafka Connect documentation
The kafka connector for SAP Systems
provides a wide set of configuration options both for source & sink.
The full list of configuration options for kafka connector for SAP Systems
is as follows:
-
Sink
-
topics
- This setting can be used to specifya comma-separated list of topics
. Must not have spaces. -
auto.create
- This setting allows the creation of a new table in SAP DBs if the table specified in{topic}.table.name
does not exist. Should be aBoolean
. Default isfalse
. -
auto.evolve
- This setting allows the evolution of the table schema with some restriction, namely when the record contains additional nullable fields that are not present previously, the corresponding columns will be added. In contrast, when the record contains less fields, the table schema will not be changed. Should be aBoolean
. Default isfalse
. -
batch.size
- This setting can be used to specify the number of records that can be pushed into SAP DB table in a single flush. Should be anInteger
. Default is3000
. -
max.retries
- This setting can be used to specify the maximum no. of retries that can be made to re-establish the connection to SAP DB in case the connection is lost. Should be anInteger
. Default is10
. -
{topic}.table.name
- This setting allows specifying the SAP DBs table name where the data needs to be written to. Should be aString
. Must be compatible to SAP DB Table name like"SCHEMA"."TABLE"
. -
{topic}.table.type
- This is a DB specific configuration setting which allows creation of Row & Column tables ifauto.create
is set to true. Default value iscolumn
. And supported values arecolumn, row
. -
{topic}.insert.mode
- This setting can be used to specify one of the available insertion modesinsert
andupsert
. Default isinsert
. -
{topic}.delete.enabled
- This setting can be used to allow the deletion of the record when its corresponding tombstone record is received by the connector. Default isfalse
. -
{topic}.pk.mode
- This setting can be used to specify the primary key mode required whenauto.create
is set totrue
& the table name specified in{topic}.table.name
does not exist in SAP DB. Default isnone
. And supported values arerecord_key, record_value
. -
{topic}.pk.fields
- This setting can be used to specifya comma-separated list of primary key fields
when{topic}.pk.mode
is set torecord_key
orrecord_value
. Must not have spaces. -
{topic}.table.partition.mode
- This is a SapDB Sink specific configuration setting which determines the table partitioning in SAP DB. Default value isnone
. And supported values arenone, hash, round_robin
. -
{topic}.table.partition.count
- This is a SapDB Sink specific configuration setting which determines the number of partitions the table should have. Required whenauto.create
is set totrue
and table specified in{topic}.table.name
does not exist in SAP DBs. Should be anInteger
. Default value is0
.
-
-
Source
-
topics
- This setting can be used to specifya comma-separated list of topics
. Must not have spaces. -
mode
- This setting can be used to specify the mode in which data should be fetched from SAP DB table. Default isbulk
. And supported values arebulk, incrementing
. -
queryMode
- This setting can be used to specify the query mode in which data should be fetched from SAP DB table. Default istable
. And supported values aretable, query ( to support sql queries )
. When usingqueryMode: query
it is also required to havequery
parameter defined. This query parameter needs to be prepended by TopicName. If theincrementing.column.name
property is used together to constrain the result, then it can be omitted from its where clause. -
{topic}.table.name
- This setting allows specifying the SAP DB table name where the data needs to be read from. Should be aString
. Must be compatible to SAP DB Table name like"SCHEMA"."TABLE"
. -
{topic}.query
- This setting allows specifying the query statement whenqueryMode
is set toquery
. Should be aString
. -
{topic}.poll.interval.ms
- This setting allows specifying the poll interval at which the data should be fetched from SAP DB table. Should be anInteger
. Default value is60000
. -
{topic}.incrementing.column.name
- In order to fetch data from a SAP DB table whenmode
is set toincrementing
, an incremental ( or auto-incremental ) column needs to be provided. The type of the column can be numeric types such asINTEGER
,FLOAT
,DECIMAL
, datetime types such asDATE
,TIME
,TIMESTAMP
, and character typesVARCHAR
,NVARCHAR
containing alpha-numeric characters. This considers SAP DB Timeseries tables also. Should be a valid column name ( respresented as aString
) present in the table. See data types in SAP HANA -
{topic}.partition.count
- This setting can be used to specify the no. of topic partitions that the Source connector can use to publish the data. Should be anInteger
. Default value is1
.
-
Folder examples
includes some example scenarios. In addtion, the unit tests
provide examples on every possible mode in which the connector can be configured.
We welcome comments, questions, and bug reports. Please create an issue to obtain support.
Contributions are accepted by sending Pull Requests to this repo. Please do not forget to sign the Contribution License Agreement.
Currently only SAP Hana is supported.
Copyright (c) 2020 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the Apache Software License, v. 2 except as noted otherwise in the LICENSE file.