diff --git a/flink-connector-aws/flink-connector-redshift/pom.xml b/flink-connector-aws/flink-connector-redshift/pom.xml
index 28ddb1f2..ebebe9f8 100644
--- a/flink-connector-aws/flink-connector-redshift/pom.xml
+++ b/flink-connector-aws/flink-connector-redshift/pom.xml
@@ -27,6 +27,13 @@
provided
+
+ software.amazon.awssdk
+ s3
+ ${aws.sdkv2.version}
+
+
+
org.apache.commons
commons-csv
@@ -43,8 +50,6 @@
org.apache.flink
flink-table-common
- ${flink.version}
- provided
@@ -53,17 +58,16 @@
org.apache.maven.plugins
- maven-shade-plugin
+ maven-jar-plugin
- shade-flink
- package
- shade
+ test-jar
+
\ No newline at end of file
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigConstants.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigConstants.java
index 409aa17b..cbddc5a9 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigConstants.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigConstants.java
@@ -33,49 +33,51 @@ public class RedshiftSinkConfigConstants {
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
- .withDescription("Hostname of Redshift Cluster");
+ .withDescription("Redshift Cluster's Hostname");
public static final ConfigOption PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(5439)
- .withDescription("Port of the Redshift Cluster");
+ .withDescription(
+ "Redshift's cluster Port. By default, AWS console set value to 5439 for redshift cluster.");
public static final ConfigOption USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
- .withDescription("Username of Redshift Cluster");
+ .withDescription("username for Redshift Cluster connection.");
public static final ConfigOption PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
- .withDescription("Password of Redshift Cluster");
+ .withDescription("Password of Redshift Cluster associated with username.");
public static final ConfigOption DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
- .withDescription("Database to which it needs to be connected");
+ .withDescription("Name of Database to which connector is intended to connect.");
public static final ConfigOption TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
- .withDescription("Table to which it needs to be connected");
+ .withDescription("Table Name to which sink/source to be setup.");
public static final ConfigOption BATCH_SIZE =
ConfigOptions.key("sink.batch-size")
.intType()
.defaultValue(1000)
- .withDescription("Port of the Redshift Cluster");
+ .withDescription(
+ "Sink Property. Batch size to be added as while writing to redshift");
public static final ConfigOption SINK_MODE =
ConfigOptions.key("sink.write.mode")
.enumType(SinkMode.class)
.defaultValue(SinkMode.JDBC)
- .withDescription("Mode of sink");
+ .withDescription("Mode of sink. Currently it only supports JDBC / COPY ");
public static final ConfigOption IAM_ROLE_ARN =
ConfigOptions.key("aws.iam-role")
@@ -87,19 +89,20 @@ public class RedshiftSinkConfigConstants {
ConfigOptions.key("sink.write.temp.s3-uri")
.stringType()
.noDefaultValue()
- .withDescription("Temporary S3 URI to store data");
+ .withDescription(
+ "Temporary S3 URI to store data. In COPY mode, Redshift architecture uses s3 to store intermittent data. Reference :https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html");
public static final ConfigOption MAX_RETIRES =
ConfigOptions.key("sink.max.retries")
.intType()
.defaultValue(2)
- .withDescription("Maximum number of Retries in case of Failure");
+ .withDescription("Maximum number of Retries");
public static final ConfigOption FLUSH_INTERVAL =
ConfigOptions.key("sink.flush.interval")
.durationType()
.defaultValue(Duration.of(10, SECONDS))
- .withDescription("Maximum number of Retries in case of Failure");
+ .withDescription("Flush Interval for Sink");
public static final ConfigOption TIMEOUT =
ConfigOptions.key("sink.connection.timeout")
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigUtil.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigUtil.java
deleted file mode 100644
index e46554fb..00000000
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/config/RedshiftSinkConfigUtil.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.redshift.config;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.FlinkRuntimeException;
-
-/** Utility functions to use with {@link RedshiftSinkConfigConstants}. */
-public class RedshiftSinkConfigUtil {
-
- // private constructor to prevent initialization of utility class.
- private RedshiftSinkConfigUtil() {}
-
- public static void validateConfigs(final Configuration sinkConfig)
- throws FlinkRuntimeException {
- switch (sinkConfig.get(RedshiftSinkConfigConstants.SINK_MODE).toString()) {
- case "JDBC":
- validateJdbcAssociatedConfigs(sinkConfig);
- break;
- case "COPY":
- validateCopyAssociatedConfigs(sinkConfig);
- break;
- default:
- throw new FlinkRuntimeException("Invalid Sink Mode");
- }
- }
-
- private static void validateJdbcAssociatedConfigs(Configuration sinkConfig) {
- if (sinkConfig.getString(RedshiftSinkConfigConstants.HOSTNAME).trim().length() < 3
- || sinkConfig.getInteger(RedshiftSinkConfigConstants.PORT) < 0) {
- throw new FlinkRuntimeException(
- "Invalid configuration. Provide hostname and port is necessary.");
- }
- }
-
- private static void validateCopyAssociatedConfigs(Configuration sinkConfig) {
- if (sinkConfig.getString(RedshiftSinkConfigConstants.S3_URI) == null
- || sinkConfig.getString(RedshiftSinkConfigConstants.IAM_ROLE_ARN) == null) {
- throw new FlinkRuntimeException(
- "Invalid Configuration. For COPY Mode s3 temporary path and iam role arn is necessary");
- }
- }
-}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftConnectionProvider.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftConnectionProvider.java
index 08a4eaa6..12041d22 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftConnectionProvider.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftConnectionProvider.java
@@ -26,9 +26,15 @@
import java.sql.Connection;
import java.sql.SQLException;
-/** Redshift connection provider. */
+/**
+ * Redshift connection provider Interface. Redshift can be configured using both JDBC and ODBC.
+ * Reference : Redshift
+ * Connection Configuration.
+ */
@Internal
public interface RedshiftConnectionProvider {
+
/**
* Get existing connection.
*
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java
index b5531184..6a5afd92 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/connection/RedshiftJdbcConnectionProvider.java
@@ -33,7 +33,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
-/** Redshift connection provider. */
+/** Redshift JDBC connection provider. */
@Internal
@NotThreadSafe
public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvider, Serializable {
@@ -103,8 +103,9 @@ public boolean isConnectionValid() throws SQLException {
}
@Override
- public BaseConnection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
- if (!connection.isClosed()) {
+ public BaseConnection getOrEstablishConnection() throws SQLException {
+
+ if (connection != null && !connection.isClosed()) {
return connection;
}
connection =
@@ -121,15 +122,16 @@ public void closeConnection() {
try {
connection.close();
} catch (SQLException e) {
- LOG.warn("Redshift connection close failed.", e);
+ LOG.warn("Redshift connection failed to close.", e);
} finally {
connection = null;
}
}
+ LOG.info("Redshift Connection is already closed.");
}
@Override
- public BaseConnection reestablishConnection() throws SQLException, ClassNotFoundException {
+ public BaseConnection reestablishConnection() throws SQLException {
closeConnection();
connection =
createConnection(
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftBatchExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftBatchExecutor.java
index 42e7be83..3dd61cd9 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftBatchExecutor.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftBatchExecutor.java
@@ -18,15 +18,15 @@
package org.apache.flink.connector.redshift.internal.executor;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
-import com.amazon.redshift.RedshiftConnection;
+import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
import org.slf4j.Logger;
@@ -59,16 +59,19 @@ public RedshiftBatchExecutor(
}
@Override
- public void prepareStatement(RedshiftConnection connection) throws SQLException {
+ public void prepareStatement(BaseConnection connection) throws SQLException {
if (connection instanceof RedshiftConnectionImpl) {
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(sql);
}
+ LOG.warn(
+ "Unable to create Redshift Statement for Execution. File a JIRA in case of issue.");
}
@Override
public void prepareStatement(RedshiftConnectionProvider connectionProvider)
throws SQLException {
+ Preconditions.checkNotNull(connectionProvider, "Connection Provider cannot be Null");
this.connectionProvider = connectionProvider;
try {
prepareStatement(connectionProvider.getOrEstablishConnection());
@@ -77,9 +80,6 @@ public void prepareStatement(RedshiftConnectionProvider connectionProvider)
}
}
- @Override
- public void setRuntimeContext(RuntimeContext context) {}
-
@Override
public void addToBatch(RowData record) throws SQLException {
switch (record.getRowKind()) {
@@ -117,6 +117,11 @@ public void closeStatement() {
}
}
+ @Override
+ public String getName() {
+ return "redshift-batch-executor";
+ }
+
@Override
public String toString() {
return "RedshiftBatchExecutor{"
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java
index fcefdd7b..8dc293ba 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.redshift.internal.executor;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
@@ -26,11 +25,13 @@
import org.apache.flink.connector.redshift.internal.statement.RedshiftStatement;
import org.apache.flink.connector.redshift.mode.SinkMode;
import org.apache.flink.connector.redshift.mode.copy.RedshiftCopyModeRowConverterImpl;
+import org.apache.flink.connector.redshift.mode.jdbc.RedshiftJDBCModeRowConverterImpl;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
-import com.amazon.redshift.RedshiftConnection;
+import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
@@ -49,9 +50,7 @@ public interface RedshiftExecutor extends Serializable {
Logger LOG = LoggerFactory.getLogger(RedshiftExecutor.class);
- void setRuntimeContext(RuntimeContext context);
-
- void prepareStatement(RedshiftConnection connection) throws SQLException;
+ void prepareStatement(BaseConnection connection) throws SQLException;
void prepareStatement(RedshiftConnectionProvider connectionProvider) throws SQLException;
@@ -61,6 +60,8 @@ public interface RedshiftExecutor extends Serializable {
void closeStatement();
+ String getName();
+
default void attemptExecuteBatch(RedshiftPreparedStatement stmt, int maxRetries)
throws SQLException {
attemptExecuteBatch(stmt, maxRetries, true);
@@ -75,7 +76,6 @@ default void attemptExecuteBatch(
} else {
stmt.execute();
}
-
return;
} catch (Exception exception) {
LOG.error("Redshift executeBatch error, retry times = {}", i, exception);
@@ -102,12 +102,15 @@ static RedshiftExecutor createRedshiftExecutor(
String[] keyFields,
LogicalType[] fieldTypes,
Configuration options) {
+
if (keyFields.length > 0) {
+ LOG.info(
+ "Primary Key is defined for table. By default Redshift Connector will work in UPSERT mode.");
if (options.get(RedshiftSinkConfigConstants.SINK_MODE).equals(SinkMode.COPY)) {
LOG.info("Create Upload Copy UPSERT Executor.");
return createUploadUpsertExecutor(fieldNames, keyFields, fieldTypes, options);
} else {
- LOG.info("Create pure JDBC UPSRET Executor.");
+ LOG.info("Create Simple JDBC Executor.");
return createUpsertExecutor(fieldNames, keyFields, fieldTypes, options);
}
@@ -116,7 +119,7 @@ static RedshiftExecutor createRedshiftExecutor(
LOG.info("Create Upload Copy batch Executor.");
return createUploadBatchExecutor(fieldNames, fieldTypes, options);
} else {
- LOG.info("Create pure JDBC batch Executor.");
+ LOG.info("Create Simple JDBC batch Executor.");
return createBatchExecutor(fieldNames, fieldTypes, options);
}
}
@@ -153,7 +156,8 @@ static RedshiftBatchExecutor createBatchExecutor(
String insertSql =
RedshiftStatement.getInsertIntoStatement(
options.getString(RedshiftSinkConfigConstants.TABLE_NAME), fieldNames);
- RedshiftRowConverter converter = new RedshiftCopyModeRowConverterImpl(fieldTypes);
+ RedshiftRowConverter converter =
+ new RedshiftJDBCModeRowConverterImpl(RowType.of(fieldTypes));
return new RedshiftBatchExecutor(insertSql, converter, options);
}
@@ -176,12 +180,14 @@ static RedshiftUpsertExecutor createUpsertExecutor(
IntStream.range(0, fieldNames.length)
.filter(idx -> !ArrayUtils.contains(keyFields, fieldNames[idx]))
.toArray();
- int[] updFields = ArrayUtils.addAll(updatableFields, delFields);
+ int[] updatableFieldsIndex = ArrayUtils.addAll(updatableFields, delFields);
LogicalType[] delTypes =
Arrays.stream(delFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
LogicalType[] updTypes =
- Arrays.stream(updFields).mapToObj(f -> fieldTypes[f]).toArray(LogicalType[]::new);
+ Arrays.stream(updatableFieldsIndex)
+ .mapToObj(f -> fieldTypes[f])
+ .toArray(LogicalType[]::new);
return new RedshiftUpsertExecutor(
insertSql,
@@ -190,7 +196,7 @@ static RedshiftUpsertExecutor createUpsertExecutor(
new RedshiftCopyModeRowConverterImpl(fieldTypes),
new RedshiftCopyModeRowConverterImpl(updTypes),
new RedshiftCopyModeRowConverterImpl(delTypes),
- createExtractor(fieldTypes, updFields),
+ createExtractor(fieldTypes, updatableFieldsIndex),
createExtractor(fieldTypes, delFields),
options);
}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadBatchExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadBatchExecutor.java
index bc33f09d..c8ef9587 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadBatchExecutor.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadBatchExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.redshift.internal.executor;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
@@ -30,7 +29,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.FlinkRuntimeException;
-import com.amazon.redshift.RedshiftConnection;
+import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
import org.slf4j.Logger;
@@ -53,7 +52,7 @@ public class RedshiftUploadBatchExecutor implements RedshiftExecutor {
private final String[] fieldNames;
- private String copySql;
+ private String sql;
private final RedshiftRowConverter copyRowConverter;
@@ -61,9 +60,9 @@ public class RedshiftUploadBatchExecutor implements RedshiftExecutor {
private final String iamRoleArn;
- private transient List csvData;
+ private final transient List csvData;
- private transient S3Client s3Client;
+ private final transient S3Client s3Client;
private transient RedshiftPreparedStatement statement;
@@ -75,7 +74,7 @@ public RedshiftUploadBatchExecutor(
this.fieldNames = fieldNames;
this.maxRetries = options.getInteger(RedshiftSinkConfigConstants.MAX_RETIRES);
this.csvData = new ArrayList<>();
- this.s3Client = S3Client.builder().build();
+ this.s3Client = S3Client.create();
this.copyRowConverter = new RedshiftCopyModeRowConverterImpl(fieldTypes);
this.tempS3Uri =
@@ -84,13 +83,11 @@ public RedshiftUploadBatchExecutor(
}
@Override
- public void prepareStatement(RedshiftConnection connection) throws SQLException {
- copySql =
- RedshiftStatement.getTableCopyStatement(
- tableName, tempS3Uri, fieldNames, iamRoleArn);
+ public void prepareStatement(BaseConnection connection) throws SQLException {
+ sql = RedshiftStatement.getTableCopyStatement(tableName, tempS3Uri, fieldNames, iamRoleArn);
if (connection instanceof RedshiftConnectionImpl) {
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
- statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(copySql);
+ statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(sql);
}
}
@@ -105,9 +102,6 @@ public void prepareStatement(RedshiftConnectionProvider connectionProvider)
}
}
- @Override
- public void setRuntimeContext(RuntimeContext context) {}
-
@Override
public void addToBatch(RowData record) throws SQLException {
Object obj = copyRowConverter.toExternal(record);
@@ -145,11 +139,16 @@ public void closeStatement() {
}
}
+ @Override
+ public String getName() {
+ return "redshift-batch-upload-executor";
+ }
+
@Override
public String toString() {
return "RedshiftUploadBatchExecutor{"
+ "copySql='"
- + copySql
+ + sql
+ '\''
+ ", maxRetries="
+ maxRetries
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadUpsertExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadUpsertExecutor.java
index 8ffcd2b9..1ed98fa8 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadUpsertExecutor.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUploadUpsertExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.redshift.internal.executor;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
@@ -30,7 +29,7 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.FlinkRuntimeException;
-import com.amazon.redshift.RedshiftConnection;
+import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
import org.slf4j.Logger;
@@ -66,7 +65,7 @@ public class RedshiftUploadUpsertExecutor implements RedshiftExecutor {
private String copyInsertSql;
- private String updateTrxSql;
+ private String updateTransactionSql;
private String deleteSql;
@@ -110,13 +109,13 @@ public RedshiftUploadUpsertExecutor(
this.iamRoleArn = options.getString(RedshiftSinkConfigConstants.IAM_ROLE_ARN);
this.s3Client = S3Client.create();
this.copyRowConverter = new RedshiftCopyModeRowConverterImpl(fieldTypes);
- this.stageTableName = "_" + tableName.split("\\.")[1] + "_stage";
+ this.stageTableName = "_" + tableName + "_stage";
this.tempS3Uri =
S3Util.getS3UriWithFileName(options.getString(RedshiftSinkConfigConstants.S3_URI));
}
@Override
- public void prepareStatement(RedshiftConnection connection) throws SQLException {
+ public void prepareStatement(BaseConnection connection) throws SQLException {
final String createTableSql =
RedshiftStatement.getCreateTempTableAsStatement(tableName, stageTableName);
final String insertSql =
@@ -132,26 +131,23 @@ public void prepareStatement(RedshiftConnection connection) throws SQLException
RedshiftStatement.getTableCopyStatement(
stageTableName, tempS3Uri, fieldNames, iamRoleArn);
- updateTrxSql =
- "BEGIN;"
- + createTableSql
- + "; "
- + truncateSql
- + ";"
- + copyUpdateSql
- + "; "
- + deleteFromStageSql
- + "; "
- + insertSql
- + "; "
- + "END;";
-
+ updateTransactionSql =
+ String.join(
+ "; ",
+ "BEGIN",
+ createTableSql,
+ truncateSql,
+ copyUpdateSql,
+ deleteFromStageSql,
+ insertSql,
+ "END;");
if (connection instanceof RedshiftConnectionImpl) {
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
insertStatement =
(RedshiftPreparedStatement) redshiftConnection.prepareStatement(copyInsertSql);
updateTrxStatement =
- (RedshiftPreparedStatement) redshiftConnection.prepareStatement(updateTrxSql);
+ (RedshiftPreparedStatement)
+ redshiftConnection.prepareStatement(updateTransactionSql);
deleteStatement =
(RedshiftPreparedStatement) redshiftConnection.prepareStatement(deleteSql);
}
@@ -168,9 +164,6 @@ public void prepareStatement(RedshiftConnectionProvider connectionProvider)
}
}
- @Override
- public void setRuntimeContext(RuntimeContext context) {}
-
@Override
public void addToBatch(RowData record) throws SQLException {
switch (record.getRowKind()) {
@@ -235,6 +228,11 @@ public void closeStatement() {
}
}
+ @Override
+ public String getName() {
+ return "redshift-upload-upsert-executor";
+ }
+
@Override
public String toString() {
return "RedshiftUploadUpsertExecutor{"
@@ -242,7 +240,7 @@ public String toString() {
+ copyInsertSql
+ '\''
+ "updateTrxSql='"
- + updateTrxSql
+ + updateTransactionSql
+ '\''
+ ", deleteSql='"
+ deleteSql
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUpsertExecutor.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUpsertExecutor.java
index 8e9d7cf0..a89f64ea 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUpsertExecutor.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/executor/RedshiftUpsertExecutor.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.redshift.internal.executor;
-import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
@@ -26,7 +25,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;
-import com.amazon.redshift.RedshiftConnection;
+import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
import org.slf4j.Logger;
@@ -36,7 +35,7 @@
import java.util.Arrays;
import java.util.function.Function;
-/** Executor for Redshift Upsert Operation. */
+/** Executor for Upsert Operation. */
public class RedshiftUpsertExecutor implements RedshiftExecutor {
private static final long serialVersionUID = 1L;
@@ -90,7 +89,7 @@ public RedshiftUpsertExecutor(
}
@Override
- public void prepareStatement(RedshiftConnection connection) throws SQLException {
+ public void prepareStatement(BaseConnection connection) throws SQLException {
if (connection instanceof RedshiftConnectionImpl) {
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
this.insertStatement =
@@ -113,9 +112,6 @@ public void prepareStatement(RedshiftConnectionProvider connectionProvider)
}
}
- @Override
- public void setRuntimeContext(RuntimeContext context) {}
-
@Override
public void addToBatch(RowData record) throws SQLException {
// TODO: how to handle the ROW sequence?
@@ -166,6 +162,11 @@ public void closeStatement() {
}
}
+ @Override
+ public String getName() {
+ return "redshift-upsert-executor";
+ }
+
@Override
public String toString() {
return "RedshiftUpsertExecutor{"
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/statement/RedshiftStatement.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/statement/RedshiftStatement.java
index bdb1e32a..80259dcc 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/statement/RedshiftStatement.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/internal/statement/RedshiftStatement.java
@@ -24,8 +24,12 @@
import java.util.Arrays;
import java.util.stream.Collectors;
-/** Provide Statement for running in Redshift. */
+/** Provide Redshift compatible Statement. */
public class RedshiftStatement implements Serializable {
+
+ /** Restrict instance creation of RedshiftStatement. */
+ private RedshiftStatement() {}
+
private static final long serialVersionUID = 1L;
public static String getInsertIntoStatement(String tableName, String[] fieldNames) {
@@ -72,69 +76,6 @@ public static String getDeleteStatement(String tableName, String[] conditionFiel
return "DELETE FROM " + quoteIdentifier(tableName) + " WHERE " + conditionClause;
}
- public static String getRowExistsStatement(String tableName, String[] conditionFields) {
- String fieldExpressions =
- Arrays.stream(conditionFields)
- .map(f -> quoteIdentifier(f) + "=?")
- .collect(Collectors.joining(" AND "));
- return "SELECT 1 FROM " + quoteIdentifier(tableName) + " WHERE " + fieldExpressions;
- }
-
- public static String getUpsertStatement(
- String tableName,
- String stageTableName,
- String[] fieldNames,
- String[] conditionFields) {
- String columns =
- Arrays.stream(fieldNames)
- .map(RedshiftStatement::quoteIdentifier)
- .collect(Collectors.joining(", "));
- String matchCondition =
- Arrays.stream(conditionFields)
- .map(
- f ->
- quoteIdentifier(tableName)
- + "."
- + quoteIdentifier(f)
- + "="
- + quoteIdentifier("stage")
- + "."
- + quoteIdentifier(f))
- .collect(Collectors.joining(" AND "));
- String setClause =
- Arrays.stream(fieldNames)
- .filter(f -> !ArrayUtils.contains(conditionFields, f))
- .map(
- f ->
- quoteIdentifier(f)
- + "="
- + quoteIdentifier("stage")
- + "."
- + quoteIdentifier(f))
- .collect(Collectors.joining(", "));
- String insertValue =
- Arrays.stream(fieldNames)
- .map(f -> quoteIdentifier("stage") + "." + quoteIdentifier(f))
- .collect(Collectors.joining(", "));
- return "MERGE INTO "
- + quoteIdentifier(tableName)
- + " USING "
- + quoteIdentifier(stageTableName)
- + " stage on "
- + matchCondition
- + " WHEN MATCHED THEN UPDATE SET "
- + setClause
- + " WHEN NOT MATCHED THEN INSERT ("
- + columns
- + ") VALUES ("
- + insertValue
- + ")";
- }
-
- public static String getDropTableStatement(String tableName) {
- return "DROP TABLE IF EXISTS " + quoteIdentifier(tableName);
- }
-
public static String getCreateTempTableAsStatement(String tableName, String tempTableName) {
return "CREATE TEMP TABLE IF NOT EXISTS "
+ quoteIdentifier(tempTableName)
@@ -145,7 +86,7 @@ public static String getCreateTempTableAsStatement(String tableName, String temp
public static String getTableCopyStatement(
String tableName, String s3Uri, String[] fieldNames, String iamRoleArn) {
- final String columns = Arrays.stream(fieldNames).collect(Collectors.joining(", "));
+ final String columns = String.join(", ", fieldNames);
return "COPY "
+ quoteIdentifier(tableName)
+ "("
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/copy/RedshiftCopyModeRowConverterImpl.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/copy/RedshiftCopyModeRowConverterImpl.java
index c38cc8e7..6e581a94 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/copy/RedshiftCopyModeRowConverterImpl.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/copy/RedshiftCopyModeRowConverterImpl.java
@@ -114,6 +114,6 @@ public Object toExternal(RowData rowData) {
@Override
public RowData toInternal(ResultSet resultSet) {
throw new UnsupportedOperationException(
- "COPY Mode is supported only for write to Redshift in batch manner.");
+ "COPY Mode is supported only for writing data to Redshift in batch manner.");
}
}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/jdbc/RedshiftJDBCModeRowConverterImpl.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/jdbc/RedshiftJDBCModeRowConverterImpl.java
index 737e1508..462c1379 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/jdbc/RedshiftJDBCModeRowConverterImpl.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/mode/jdbc/RedshiftJDBCModeRowConverterImpl.java
@@ -32,7 +32,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
-/** Redshift Converter Implementation. */
+/** Redshift Converter Implementation in JDBC Mode. */
public class RedshiftJDBCModeRowConverterImpl extends AbstractRedshiftRowConverter {
private static final long serialVersionUID = 1L;
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/sink/AmazonRedshiftDataStreamWriter.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/sink/AmazonRedshiftDataStreamWriter.java
deleted file mode 100644
index 8d561a7a..00000000
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/sink/AmazonRedshiftDataStreamWriter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.redshift.sink;
-
-// import com.amazon.redshift.copy.RedshiftCopyOutputStream;
-// import org.apache.flink.annotation.Internal;
-// import org.apache.flink.annotation.PublicEvolving;
-// import org.apache.flink.connector.base.sink.AsyncSinkBase;
-// import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
-// import org.apache.flink.core.io.SimpleVersionedSerializer;
-//
-// import java.io.IOException;
-// import java.util.Collection;
-
-/** Redshift Sink. */
-public class AmazonRedshiftDataStreamWriter {}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java
new file mode 100644
index 00000000..5a88f124
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableFactory.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.table;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.redshift.mode.SinkMode;
+import org.apache.flink.connector.redshift.util.S3Util;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.BATCH_SIZE;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.DATABASE_NAME;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.FLUSH_INTERVAL;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.HOSTNAME;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.IAM_ROLE_ARN;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.PASSWORD;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.PORT;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.S3_URI;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.SINK_MODE;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.TABLE_NAME;
+import static org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants.USERNAME;
+import static org.apache.flink.table.connector.source.lookup.LookupOptions.MAX_RETRIES;
+
+/** Redshift Dynamic Table Factory. */
+public class RedshiftDynamicTableFactory implements DynamicTableSinkFactory {
+ public static final String IDENTIFIER = "redshift";
+ private static final Logger LOG = LoggerFactory.getLogger(RedshiftDynamicTableFactory.class);
+
+ // public static final ConfigOption HOSTNAME =
+ // ConfigOptions.key("hostname")
+ // .stringType()
+ // .noDefaultValue()
+ // .withDeprecatedKeys("the Redshift hostname.");
+ //
+ // public static final ConfigOption PORT =
+ // ConfigOptions.key("port")
+ // .intType()
+ // .defaultValue(5439)
+ // .withDeprecatedKeys("the Redshift port.");
+ //
+ // public static final ConfigOption USERNAME =
+ // ConfigOptions.key("username")
+ // .stringType()
+ // .noDefaultValue()
+ // .withDescription("the Redshift username.");
+ //
+ // public static final ConfigOption PASSWORD =
+ // ConfigOptions.key("password")
+ // .stringType()
+ // .noDefaultValue()
+ // .withDescription("the Redshift password.");
+ //
+ // public static final ConfigOption DATABASE_NAME =
+ // ConfigOptions.key("database-name")
+ // .stringType()
+ // .defaultValue("dev")
+ // .withDescription("the Redshift database name. Default to `dev`.");
+ //
+ // public static final ConfigOption TABLE_NAME =
+ // ConfigOptions.key("table-name")
+ // .stringType()
+ // .noDefaultValue()
+ // .withDescription("the Redshift table name.");
+ //
+ // public static final ConfigOption SINK_BATCH_SIZE =
+ // ConfigOptions.key("sink.batch-size")
+ // .intType()
+ // .defaultValue(Integer.valueOf(1000))
+ // .withDescription(
+ // "the flush max size, over this number of records, will flush data.
+ // The default value is 1000.");
+ //
+ // public static final ConfigOption SINK_FLUSH_INTERVAL =
+ // ConfigOptions.key("sink.flush-interval")
+ // .durationType()
+ // .defaultValue(Duration.ofSeconds(1L))
+ // .withDescription(
+ // "the flush interval mills, over this time, asynchronous threads
+ // will flush data. The default value is 1s.");
+ //
+ // public static final ConfigOption SINK_MAX_RETRIES =
+ // ConfigOptions.key("sink.max-retries")
+ // .intType()
+ // .defaultValue(Integer.valueOf(3))
+ // .withDescription("the max retry times if writing records to database
+ // failed.");
+ //
+ // public static final ConfigOption COPY_MODE =
+ // ConfigOptions.key("copy-mode")
+ // .booleanType()
+ // .defaultValue(Boolean.valueOf(false))
+ // .withDescription("using Redshift COPY command to insert/upsert or not.");
+ // public static final ConfigOption TEMP_S3_URI =
+ // ConfigOptions.key("copy-temp-s3-uri")
+ // .stringType()
+ // .noDefaultValue()
+ // .withDescription("using Redshift COPY command must provide a S3 URI.");
+ // public static final ConfigOption IAM_ROLE_ARN =
+ // ConfigOptions.key("iam-role-arn")
+ // .stringType()
+ // .noDefaultValue()
+ // .withDescription(
+ // "using Redshift COPY function must provide a IAM Role which have
+ // attached to the Cluster.");
+
+ @Override
+ public DynamicTableSink createDynamicTableSink(Context context) {
+ FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ ReadableConfig config = helper.getOptions();
+ helper.validate();
+ validateConfigOptions(config);
+ ResolvedSchema resolvedSchema = context.getCatalogTable().getResolvedSchema();
+ String[] primaryKeys =
+ resolvedSchema
+ .getPrimaryKey()
+ .map(UniqueConstraint::getColumns)
+ .map(keys -> keys.toArray(new String[0]))
+ .orElse(new String[0]);
+ String[] fieldNames = resolvedSchema.getColumnNames().toArray(new String[0]);
+ DataType[] fieldDataTypes = resolvedSchema.getColumnDataTypes().toArray(new DataType[0]);
+ return new RedshiftDynamicTableSink(
+ (Configuration) config, primaryKeys, fieldNames, fieldDataTypes);
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ Set> requiredOptions = new HashSet<>();
+ requiredOptions.add(HOSTNAME);
+ requiredOptions.add(PORT);
+ requiredOptions.add(DATABASE_NAME);
+ requiredOptions.add(TABLE_NAME);
+ requiredOptions.add(SINK_MODE);
+ return requiredOptions;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> optionalOptions = new HashSet<>();
+ optionalOptions.add(USERNAME);
+ optionalOptions.add(PASSWORD);
+ optionalOptions.add(BATCH_SIZE);
+ optionalOptions.add(FLUSH_INTERVAL);
+ optionalOptions.add(MAX_RETRIES);
+ optionalOptions.add(S3_URI);
+ optionalOptions.add(IAM_ROLE_ARN);
+ return optionalOptions;
+ }
+
+ private void validateConfigOptions(ReadableConfig config) {
+ if (config.get(SINK_MODE).equals(SinkMode.COPY)
+ && !config.getOptional(S3_URI).isPresent()) {
+ throw new IllegalArgumentException(
+ "A S3 URL must be provided as the COPY mode is True!");
+ }
+ String uri = config.get(S3_URI);
+ LOG.error("S3 URI from factory :" + uri);
+ try {
+ S3Util.getS3Parts(uri);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("A incorrect S3 URL " + uri + " provided!", e);
+ }
+ if (config.get(SINK_MODE).equals(SinkMode.COPY)
+ && !config.getOptional(IAM_ROLE_ARN).isPresent()) {
+ throw new IllegalArgumentException(
+ "A IAM Role ARN which attached to the Amazon Redshift cluster must be provided for COPY Mode");
+ }
+ }
+}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java
new file mode 100644
index 00000000..f20a7a43
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/table/RedshiftDynamicTableSink.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.redshift.util.AbstractRedshiftRichOutputFormat;
+import org.apache.flink.connector.redshift.util.RedshiftRichOutputFormat;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+/** Redshift Dynamic Table Sink. */
+public class RedshiftDynamicTableSink implements DynamicTableSink {
+ private final String[] primaryKeys;
+
+ private final String[] fieldNames;
+
+ private final DataType[] fileDataTypes;
+
+ private final Configuration options;
+
+ public RedshiftDynamicTableSink(
+ Configuration options,
+ String[] primaryKeys,
+ String[] fieldNames,
+ DataType[] fileDataTypes) {
+
+ this.primaryKeys = primaryKeys;
+ this.fieldNames = fieldNames;
+ this.fileDataTypes = fileDataTypes;
+ this.options = options;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ validatePrimaryKey(requestedMode);
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
+ }
+
+ private void validatePrimaryKey(ChangelogMode requestedMode) {
+ Preconditions.checkState(
+ (ChangelogMode.insertOnly().equals(requestedMode) || primaryKeys.length > 0),
+ "Declare primary key for sink table when query contains update/delete record.");
+ }
+
+ @Override
+ public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(
+ DynamicTableSink.Context context) {
+
+ AbstractRedshiftRichOutputFormat outputFormat =
+ new RedshiftRichOutputFormat.Builder()
+ .withConnectionProperties(options)
+ .withFieldNames(fieldNames)
+ .withFieldTypes(fileDataTypes)
+ .withPrimaryKey(primaryKeys)
+ .build();
+ return OutputFormatProvider.of(outputFormat);
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new RedshiftDynamicTableSink(
+ this.options, this.primaryKeys, this.fieldNames, this.fileDataTypes);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Redshift Sink";
+ }
+}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java
new file mode 100644
index 00000000..92c51422
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/AbstractRedshiftRichOutputFormat.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.util;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
+import org.apache.flink.connector.redshift.internal.connection.RedshiftJdbcConnectionProvider;
+import org.apache.flink.connector.redshift.internal.executor.RedshiftExecutor;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/** Abstract Rich Output Format. */
+public abstract class AbstractRedshiftRichOutputFormat extends RichOutputFormat
+ implements Flushable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractRedshiftRichOutputFormat.class);
+
+ protected transient volatile boolean closed = false;
+
+ protected transient ScheduledExecutorService scheduler;
+
+ protected transient ScheduledFuture> scheduledFuture;
+
+ protected transient volatile Exception flushException;
+
+ public AbstractRedshiftRichOutputFormat() {}
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ public void scheduledFlush(long intervalMillis, String executorName) {
+ Preconditions.checkArgument(intervalMillis > 0, "flush interval must be greater than 0");
+ scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(executorName));
+ scheduledFuture =
+ scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized (this) {
+ if (!closed) {
+ try {
+ flush();
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ }
+ },
+ intervalMillis,
+ intervalMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void checkBeforeFlush(final RedshiftExecutor executor) throws IOException {
+ checkFlushException();
+ try {
+ executor.executeBatch();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+
+ try {
+ flush();
+ } catch (Exception exception) {
+ LOG.warn("Flushing records to Redshift failed.", exception);
+ }
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ this.scheduler.shutdown();
+ }
+
+ closeOutputFormat();
+ checkFlushException();
+ }
+ }
+
+ protected abstract void closeOutputFormat();
+
+ protected void checkFlushException() {
+ if (flushException != null) {
+ throw new RuntimeException("Flush exception found.", flushException);
+ }
+ }
+
+ /** Builder for Redshift Output Format. */
+ public static class Builder {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractRedshiftRichOutputFormat.Builder.class);
+
+ private DataType[] fieldTypes;
+
+ private LogicalType[] logicalTypes;
+
+ private Configuration connectionProperties;
+
+ private String[] fieldNames;
+
+ private String[] primaryKeys;
+
+ public Builder() {}
+
+ public Builder withConnectionProperties(Configuration connectionProperties) {
+ this.connectionProperties = connectionProperties;
+ return this;
+ }
+
+ public Builder withFieldTypes(DataType[] fieldTypes) {
+ this.fieldTypes = fieldTypes;
+ this.logicalTypes =
+ Arrays.stream(fieldTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
+ return this;
+ }
+
+ public Builder withFieldNames(String[] fieldNames) {
+ this.fieldNames = fieldNames;
+ return this;
+ }
+
+ public Builder withPrimaryKey(String[] primaryKeys) {
+ this.primaryKeys = primaryKeys;
+ return this;
+ }
+
+ public AbstractRedshiftRichOutputFormat build() {
+ Preconditions.checkNotNull(fieldNames);
+ Preconditions.checkNotNull(fieldTypes);
+ Preconditions.checkNotNull(primaryKeys);
+ if (primaryKeys.length > 0) {
+ LOG.warn("If primary key is specified, connector will be in UPSERT mode.");
+ LOG.warn(
+ "The data will be updated / deleted by the primary key, you will have significant performance loss.");
+ } else {
+ LOG.warn("No primary key is specified, connector will be INSERT only mode.");
+ }
+
+ RedshiftConnectionProvider connectionProvider =
+ new RedshiftJdbcConnectionProvider(connectionProperties);
+
+ return new RedshiftRichOutputFormat(
+ connectionProvider,
+ fieldNames,
+ primaryKeys,
+ logicalTypes,
+ connectionProperties);
+ }
+ }
+}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java
new file mode 100644
index 00000000..4e8722ad
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/RedshiftRichOutputFormat.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
+import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
+import org.apache.flink.connector.redshift.internal.executor.RedshiftExecutor;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+/** Redshift Rich Output format. */
+public class RedshiftRichOutputFormat extends AbstractRedshiftRichOutputFormat {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RedshiftRichOutputFormat.class);
+
+ private final RedshiftConnectionProvider connectionProvider;
+
+ private final String[] fieldNames;
+
+ private final String[] keyFields;
+
+ private final LogicalType[] fieldTypes;
+
+ private final Configuration options;
+
+ private transient RedshiftExecutor executor;
+
+ private transient int batchCount = 0;
+
+ protected RedshiftRichOutputFormat(
+ RedshiftConnectionProvider connectionProvider,
+ String[] fieldNames,
+ String[] keyFields,
+ LogicalType[] fieldTypes,
+ Configuration options) {
+ this.connectionProvider = Preconditions.checkNotNull(connectionProvider);
+ this.fieldNames = Preconditions.checkNotNull(fieldNames);
+ this.keyFields = Preconditions.checkNotNull(keyFields);
+ this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
+ this.options = Preconditions.checkNotNull(options);
+ }
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ try {
+ executor =
+ RedshiftExecutor.createRedshiftExecutor(
+ fieldNames, keyFields, fieldTypes, options);
+
+ executor.prepareStatement(connectionProvider);
+ LOG.info("Executor: " + executor);
+
+ long flushIntervalMillis =
+ options.get(RedshiftSinkConfigConstants.FLUSH_INTERVAL).toMillis();
+ scheduledFlush(flushIntervalMillis, executor.getName());
+ } catch (Exception exception) {
+ throw new IOException("Unable to establish connection with Redshift.", exception);
+ }
+ }
+
+ @Override
+ public synchronized void writeRecord(RowData record) throws IOException {
+ checkFlushException();
+
+ try {
+ executor.addToBatch(record);
+ batchCount++;
+ if (batchCount >= options.getInteger(RedshiftSinkConfigConstants.BATCH_SIZE)) {
+ flush();
+ }
+ } catch (SQLException exception) {
+ throw new IOException("Writing record to Redshift statement failed.", exception);
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (batchCount > 0) {
+ checkBeforeFlush(executor);
+ batchCount = 0;
+ }
+ }
+
+ @Override
+ public synchronized void closeOutputFormat() {
+ executor.closeStatement();
+ connectionProvider.closeConnection();
+ }
+}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/S3Util.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/S3Util.java
index 91221105..3f3e7c73 100644
--- a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/S3Util.java
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/util/S3Util.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.redshift.util;
+import com.esotericsoftware.minlog.Log;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import software.amazon.awssdk.services.s3.S3Client;
@@ -29,7 +30,6 @@
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@@ -53,7 +53,7 @@ public static void s3OutputCsv(S3Client s3Client, String s3Uri, List c
tempOut.close();
csvPrinter.close();
} catch (Exception e) {
- throw new IOException(String.format("The S3 URI {} is not correct! ", s3Uri), e);
+ throw new IOException("The S3 URI " + s3Uri + "is not correct! ", e);
}
}
@@ -67,15 +67,17 @@ public static void s3DeleteObj(S3Client s3Client, String s3Uri) throws IOExcepti
}
}
- public static String[] getS3Parts(String s3Uri)
- throws IllegalArgumentException, URISyntaxException {
- S3Uri amazonS3Uri = S3Uri.builder().uri(new URI(s3Uri)).build();
+ public static String[] getS3Parts(String s3Uri) throws IllegalArgumentException {
+ Log.info("S3 URI is " + s3Uri);
+ URI uri = URI.create(s3Uri);
+ String key = uri.getPath();
+ String bucket = uri.getAuthority();
+ String[] t = s3Uri.split("/");
+ S3Uri amazonS3Uri = S3Uri.builder().uri(uri).key(key).bucket(bucket).build();
if (!amazonS3Uri.key().isPresent()) {
throw new IllegalArgumentException("Invalid URI");
}
- String key = amazonS3Uri.key().get();
-
if ((key.charAt(key.length() - 1)) == '/') {
key = removeLastCharOptional(key);
}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/resources/META-INF/licenses/.gitkeep b/flink-connector-aws/flink-connector-redshift/src/main/resources/META-INF/licenses/.gitkeep
new file mode 100644
index 00000000..e69de29b
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-aws/flink-connector-redshift/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000..e281664a
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.redshift.table.RedshiftDynamicTableFactory