Skip to content

Commit

Permalink
Final Redshift Connector Working Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Samrat002 committed Nov 10, 2023
1 parent 4047b55 commit eaaa378
Show file tree
Hide file tree
Showing 21 changed files with 741 additions and 252 deletions.
16 changes: 10 additions & 6 deletions flink-connector-aws/flink-connector-redshift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws.sdkv2.version}</version>
</dependency>


<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
Expand All @@ -43,8 +50,6 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
Expand All @@ -53,17 +58,16 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,51 @@ public class RedshiftSinkConfigConstants {
ConfigOptions.key("hostname")
.stringType()
.noDefaultValue()
.withDescription("Hostname of Redshift Cluster");
.withDescription("Redshift Cluster's Hostname");

public static final ConfigOption<Integer> PORT =
ConfigOptions.key("port")
.intType()
.defaultValue(5439)
.withDescription("Port of the Redshift Cluster");
.withDescription(
"Redshift's cluster Port. By default, AWS console set value to 5439 for redshift cluster.");

public static final ConfigOption<String> USERNAME =
ConfigOptions.key("username")
.stringType()
.noDefaultValue()
.withDescription("Username of Redshift Cluster");
.withDescription("username for Redshift Cluster connection.");

public static final ConfigOption<String> PASSWORD =
ConfigOptions.key("password")
.stringType()
.noDefaultValue()
.withDescription("Password of Redshift Cluster");
.withDescription("Password of Redshift Cluster associated with username.");

public static final ConfigOption<String> DATABASE_NAME =
ConfigOptions.key("database-name")
.stringType()
.noDefaultValue()
.withDescription("Database to which it needs to be connected");
.withDescription("Name of Database to which connector is intended to connect.");

public static final ConfigOption<String> TABLE_NAME =
ConfigOptions.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("Table to which it needs to be connected");
.withDescription("Table Name to which sink/source to be setup.");

public static final ConfigOption<Integer> BATCH_SIZE =
ConfigOptions.key("sink.batch-size")
.intType()
.defaultValue(1000)
.withDescription("Port of the Redshift Cluster");
.withDescription(
"Sink Property. Batch size to be added as while writing to redshift");

public static final ConfigOption<SinkMode> SINK_MODE =
ConfigOptions.key("sink.write.mode")
.enumType(SinkMode.class)
.defaultValue(SinkMode.JDBC)
.withDescription("Mode of sink");
.withDescription("Mode of sink. Currently it only supports JDBC / COPY ");

public static final ConfigOption<String> IAM_ROLE_ARN =
ConfigOptions.key("aws.iam-role")
Expand All @@ -87,19 +89,20 @@ public class RedshiftSinkConfigConstants {
ConfigOptions.key("sink.write.temp.s3-uri")
.stringType()
.noDefaultValue()
.withDescription("Temporary S3 URI to store data");
.withDescription(
"Temporary S3 URI to store data. In COPY mode, Redshift architecture uses s3 to store intermittent data. Reference :https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html");

public static final ConfigOption<Integer> MAX_RETIRES =
ConfigOptions.key("sink.max.retries")
.intType()
.defaultValue(2)
.withDescription("Maximum number of Retries in case of Failure");
.withDescription("Maximum number of Retries");

public static final ConfigOption<Duration> FLUSH_INTERVAL =
ConfigOptions.key("sink.flush.interval")
.durationType()
.defaultValue(Duration.of(10, SECONDS))
.withDescription("Maximum number of Retries in case of Failure");
.withDescription("Flush Interval for Sink");

public static final ConfigOption<Duration> TIMEOUT =
ConfigOptions.key("sink.connection.timeout")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@
import java.sql.Connection;
import java.sql.SQLException;

/** Redshift connection provider. */
/**
* Redshift connection provider Interface. Redshift can be configured using both JDBC and ODBC.
* Reference : <a
* href="https://docs.aws.amazon.com/redshift/latest/mgmt/configuring-connections.html">Redshift
* Connection Configuration</a>.
*/
@Internal
public interface RedshiftConnectionProvider {

/**
* Get existing connection.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;

/** Redshift connection provider. */
/** Redshift JDBC connection provider. */
@Internal
@NotThreadSafe
public class RedshiftJdbcConnectionProvider implements RedshiftConnectionProvider, Serializable {
Expand Down Expand Up @@ -103,8 +103,9 @@ public boolean isConnectionValid() throws SQLException {
}

@Override
public BaseConnection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (!connection.isClosed()) {
public BaseConnection getOrEstablishConnection() throws SQLException {

if (connection != null && !connection.isClosed()) {
return connection;
}
connection =
Expand All @@ -121,15 +122,16 @@ public void closeConnection() {
try {
connection.close();
} catch (SQLException e) {
LOG.warn("Redshift connection close failed.", e);
LOG.warn("Redshift connection failed to close.", e);
} finally {
connection = null;
}
}
LOG.info("Redshift Connection is already closed.");
}

@Override
public BaseConnection reestablishConnection() throws SQLException, ClassNotFoundException {
public BaseConnection reestablishConnection() throws SQLException {
closeConnection();
connection =
createConnection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

package org.apache.flink.connector.redshift.internal.executor;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.redshift.config.RedshiftSinkConfigConstants;
import org.apache.flink.connector.redshift.converter.RedshiftRowConverter;
import org.apache.flink.connector.redshift.internal.connection.RedshiftConnectionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import com.amazon.redshift.RedshiftConnection;
import com.amazon.redshift.core.BaseConnection;
import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
import com.amazon.redshift.jdbc.RedshiftPreparedStatement;
import org.slf4j.Logger;
Expand Down Expand Up @@ -59,16 +59,19 @@ public RedshiftBatchExecutor(
}

@Override
public void prepareStatement(RedshiftConnection connection) throws SQLException {
public void prepareStatement(BaseConnection connection) throws SQLException {
if (connection instanceof RedshiftConnectionImpl) {
RedshiftConnectionImpl redshiftConnection = (RedshiftConnectionImpl) connection;
statement = (RedshiftPreparedStatement) redshiftConnection.prepareStatement(sql);
}
LOG.warn(
"Unable to create Redshift Statement for Execution. File a JIRA in case of issue.");
}

@Override
public void prepareStatement(RedshiftConnectionProvider connectionProvider)
throws SQLException {
Preconditions.checkNotNull(connectionProvider, "Connection Provider cannot be Null");
this.connectionProvider = connectionProvider;
try {
prepareStatement(connectionProvider.getOrEstablishConnection());
Expand All @@ -77,9 +80,6 @@ public void prepareStatement(RedshiftConnectionProvider connectionProvider)
}
}

@Override
public void setRuntimeContext(RuntimeContext context) {}

@Override
public void addToBatch(RowData record) throws SQLException {
switch (record.getRowKind()) {
Expand Down Expand Up @@ -117,6 +117,11 @@ public void closeStatement() {
}
}

@Override
public String getName() {
return "redshift-batch-executor";
}

@Override
public String toString() {
return "RedshiftBatchExecutor{"
Expand Down
Loading

0 comments on commit eaaa378

Please sign in to comment.