Skip to content

A plugin of Flink JDBC SQL Connector to allow reading data from and writing data into Vertica

License

Notifications You must be signed in to change notification settings

dingqiangliu/vertica-flink-connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Vertica Connector for Flink

This project is a plugin of Flink JDBC SQL Connector to allow reading data from Vertica by batch, writing data into Vertica from streams of Flink CDC Connectors for other databases like PostgreSQL/MySQL/Oracle/SQL Server and other streams or batch sources.

The architecture of Flink

The architecture of Flink CDC

Examples

Write data from MySQL to Vertica by batch

This demo can be easily run with docker-compose. Here is the key part:

CREATE TABLE test_flink_orders (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'
    , 'url' = 'jdbc:mysql://${yourMySQLServer}:3306/${yourMySQLDB}?
    , 'username' = '${yourUsername}'
    , 'password' = '${yourPassword}'
    , 'scan.fetch-size' = '10000'
    , 'table-name' = 'test_flink_orders'
);

CREATE TABLE test_flink_orders_target (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'
    , 'url' = 'jdbc:vertica://${yourVerticaServer}:5433/${yourVerticaDBName}'
    , 'username' = '${yourUsername}'
    , 'password' = '${yourPassword}'
    , 'sink.buffer-flush.max-rows' = '10000'
    , 'table-name' = 'test_flink_orders_target'
);

INSERT INTO test_flink_orders_target
SELECT 
    orderID
    , custName 
    , fAmount
    , dAmount
    , deAmount
    , nAmount
    , bVIP
    , dCreate
    , tCreate
    , tzCreate
    , dtCreate
    , dtzCreate
    , binPhoto
FROM test_flink_orders;

Ingesting changes of MySQL to Vertica in real-time

This demo can be easily run with docker-compose. Here is its key part:

CREATE TABLE test_flink_orders (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc'
    , 'hostname' = 'localhost'
    , 'port' = '3306'
    , 'jdbc.properties.serverTimezone' = '${TZ}'
    , 'username' = 'liudq'
    , 'password' = 'mysql'
    , 'database-name' = 'liudq'
    , 'table-name' = 'test_flink_orders'
);

CREATE TABLE test_flink_orders_target (
    orderID INT
    , custName VARCHAR(50)
    , fAmount FLOAT
    , dAmount DOUBLE
    , deAmount DECIMAL(17,4)
    , nAmount DECIMAL(17,4)
    , bVIP BOOLEAN
    , dCreate DATE
    , tCreate TIME(6)
    , tzCreate TIME(6)
    , dtCreate TIMESTAMP(6)
    , dtzCreate TIMESTAMP(6)
    , binPhoto BYTES
    , PRIMARY KEY (orderID) NOT ENFORCED
) WITH (
    'connector' = 'jdbc'
    , 'url' = 'jdbc:vertica://${yourVerticaServer}:5433/${yourVerticaDBName}'
    , 'username' = '${yourUsername}'
    , 'password' = '${yourPassword}'
    , 'sink.buffer-flush.max-rows' = '10000'
    , 'table-name' = 'test_flink_orders_target'
);

INSERT INTO test_flink_orders_target
SELECT 
    orderID
    , custName 
    , fAmount
    , dAmount
    , deAmount
    , nAmount
    , bVIP
    , dCreate
    , tCreate
    , tzCreate
    , dtCreate
    , dtzCreate
    , binPhoto
FROM test_flink_orders;

Installation

At first, you need setup Flink cluster and its JDBC SQL Connector, and CDC Connectors optionally.

You can download the latest version of vertica-flink-connector_${FLINK_VERSON}-*.jar, or build it from source code of this project, and just put it in ${FLINK_HOME}/lib and restart you cluster..

[Optional] Build from source code

Requirements

  • Java 11+
  • Maven 3.3+

You will get vertica-flink-connector_${FLINK_VERSON}-*.jar under [target/] directory after correctly running following command under top of source code tree.

mvn -DskipTests=true clean package

About

A plugin of Flink JDBC SQL Connector to allow reading data from and writing data into Vertica

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages