diff --git a/flink-connector-aws/flink-connector-redshift/README.md b/flink-connector-aws/flink-connector-redshift/README.md
new file mode 100644
index 00000000..30fe503d
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/README.md
@@ -0,0 +1,121 @@
+# Flink Redshift Connector
+
+This is the initial Proof of Concept for Flink connector redshift in 2 modes
+
+- read.mode = JDBC
+- read.mode = COPY
+
+This POC only supports Sink Table.
+
+## Connector Options
+| Option | Required | Default | Type | Description |
+|:-------|:---------|:---------|:-----|:------------|
+ hostname | required | none | String | Redshift connection hostname
+ port | required | 5439 | Integer | Redshift connection port
+ username | required | none | String | Redshift user username
+ password | required | none | String | Redshift user password
+ database-name | required | dev | String | Redshift database to connect
+ table-name | required | none | String | Reshift table name
+ sink.batch-size | optional | 1000 | Integer | The max flush size, over this will flush data.
+ sink.flush-interval | optional | 1s | Duration | Over this flush interval mills, asynchronous threads will flush data.
+ sink.max-retries | optional | 3 | Integer | The max retry times when writing records to the database failed.
+ copy-mode | required | false | Boolean | Using Redshift COPY command to insert/upsert or not.
+ copy-temp-s3-uri | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a S3 URI.
+ iam-role-arn | conditional required | none | String | If the copy-mode=true then then Redshift COPY command must need a IAM role. And this role must have the privilege and attache to the Redshift cluser.
+
+**Update/Delete Data Considerations:**
+The data is updated and deleted by the primary key.
+
+## Data Type Mapping
+
+| Flink Type | Redshift Type |
+|:--------------------|:--------------------------------------------------------|
+| CHAR | VARCHAR |
+| VARCHAR | VARCHAR |
+| STRING | VARCHAR |
+| BOOLEAN | Boolean |
+| BYTES | Not supported |
+| DECIMAL | Decimal |
+| TINYINT | Int8 |
+| SMALLINT | Int16 |
+| INTEGER | Int32 |
+| BIGINT | Int64 |
+| FLOAT | Float32 |
+| DOUBLE | Float64 |
+| DATE | Date |
+| TIME | Timestamp |
+| TIMESTAMP | Timestamp |
+| TIMESTAMP_LTZ | Timestamp |
+| INTERVAL_YEAR_MONTH | Int32 |
+| INTERVAL_DAY_TIME | Int64 |
+| ARRAY | Not supported |
+| MAP | Not supported |
+| ROW | Not supported |
+| MULTISET | Not supported |
+| RAW | Not supported |
+
+
+
+## How POC is Tested
+
+### Create and sink a table in pure JDBC mode
+
+```SQL
+
+-- register a Redshift table `t_user` in flink sql.
+CREATE TABLE t_user (
+ `user_id` BIGINT,
+ `user_type` INTEGER,
+ `language` STRING,
+ `country` STRING,
+ `gender` STRING,
+ `score` DOUBLE,
+ PRIMARY KEY (`user_id`) NOT ENFORCED
+) WITH (
+ 'connector' = 'redshift',
+ 'hostname' = 'xxxx.redshift.awsamazon.com',
+ 'port' = '5439',
+ 'username' = 'awsuser',
+ 'password' = 'passwordxxxx',
+ 'database-name' = 'tutorial',
+ 'table-name' = 'users',
+ 'sink.batch-size' = '500',
+ 'sink.flush-interval' = '1000',
+ 'sink.max-retries' = '3'
+);
+
+-- write data into the Redshift table from the table `T`
+INSERT INTO t_user
+SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`) FROM T;
+
+```
+
+### Create and sink a table in COPY mode
+
+```SQL
+
+-- register a Redshift table `t_user` in flink sql.
+CREATE TABLE t_user (
+ `user_id` BIGINT,
+ `user_type` INTEGER,
+ `language` STRING,
+ `country` STRING,
+ `gender` STRING,
+ `score` DOUBLE,
+ PRIMARY KEY (`user_id`) NOT ENFORCED
+) WITH (
+ 'connector' = 'redshift',
+ 'hostname' = 'xxxx.redshift.awsamazon.com',
+ 'port' = '5439',
+ 'username' = 'awsuser',
+ 'password' = 'passwordxxxx',
+ 'database-name' = 'tutorial',
+ 'table-name' = 'users',
+ 'sink.batch-size' = '500',
+ 'sink.flush-interval' = '1000',
+ 'sink.max-retries' = '3',
+ 'copy-mode' = 'true',
+ 'copy-temp-s3-uri' = 's3://bucket-name/key/temp',
+ 'iam-role-arn' = 'arn:aws:iam::xxxxxxxx:role/xxxxxRedshiftS3Rolexxxxx'
+);
+```
diff --git a/flink-connector-aws/flink-connector-redshift/pom.xml b/flink-connector-aws/flink-connector-redshift/pom.xml
new file mode 100644
index 00000000..e7749c53
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/pom.xml
@@ -0,0 +1,125 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.flink
+ flink-connector-aws-parent
+ 4.3-SNAPSHOT
+
+
+ flink-connector-redshift
+ Flink : Connectors : AWS : Amazon Redshift
+
+
+ 2.1.0.17
+ 1.2
+ 1.10.0
+ 2.12
+
+
+ jar
+
+
+
+ org.apache.flink
+ flink-connector-aws-base
+ ${parent.version}
+ provided
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
+
+
+
+
+
+
+
+
+ com.amazon.redshift
+ redshift-jdbc42
+ ${redshift.jdbc.version}
+ provided
+
+
+
+ org.apache.commons
+ commons-csv
+ ${commons-csv.version}
+
+
+
+ com.amazonaws
+ aws-java-sdk-core
+ ${aws.sdkv1.version}
+
+
+ com.amazonaws
+ aws-java-sdk-s3
+ ${aws.sdkv1.version}
+
+
+
+
+
+ commons-logging
+ commons-logging
+ ${commons-logging.version}
+
+
+
+
+
\ No newline at end of file
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/connection/RedshiftConnectionProvider.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/connection/RedshiftConnectionProvider.java
new file mode 100644
index 00000000..a834b632
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/connection/RedshiftConnectionProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.connection;
+
+import org.apache.flink.connector.redshift.options.RedshiftOptions;
+
+import com.amazon.redshift.jdbc.RedshiftConnectionImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/** Redshift Connection Provider. */
+public class RedshiftConnectionProvider implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ static final Logger LOG = LoggerFactory.getLogger(RedshiftConnectionProvider.class);
+
+ private static final String REDSHIFT_DRIVER_NAME = "com.amazon.redshift.Driver";
+
+ private transient RedshiftConnectionImpl connection;
+
+ private final RedshiftOptions options;
+
+ public RedshiftConnectionProvider(RedshiftOptions options) {
+ this.options = options;
+ }
+
+ public synchronized RedshiftConnectionImpl getConnection() throws SQLException {
+ if (connection == null) {
+ connection =
+ createConnection(
+ options.getHostname(), options.getPort(), options.getDatabaseName());
+ }
+ return connection;
+ }
+
+ private RedshiftConnectionImpl createConnection(String hostname, int port, String dbName)
+ throws SQLException {
+ // String url = parseUrl(urls);
+
+ RedshiftConnectionImpl conn;
+ String url = "jdbc:redshift://" + hostname + ":" + port + "/" + dbName;
+ LOG.info("connection to {}", url);
+
+ try {
+ Class.forName(REDSHIFT_DRIVER_NAME);
+ } catch (ClassNotFoundException e) {
+ throw new SQLException(e);
+ }
+
+ if (options.getUsername().isPresent()) {
+ conn =
+ (RedshiftConnectionImpl)
+ DriverManager.getConnection(
+ url,
+ options.getUsername().orElse(null),
+ options.getPassword().orElse(null));
+ } else {
+ conn = (RedshiftConnectionImpl) DriverManager.getConnection(url);
+ }
+
+ return conn;
+ }
+
+ public void closeConnection() throws SQLException {
+ if (this.connection != null) {
+ this.connection.close();
+ }
+ }
+
+ public RedshiftConnectionImpl getOrCreateConnection() throws SQLException {
+ if (connection == null) {
+ connection =
+ createConnection(
+ options.getHostname(), options.getPort(), options.getDatabaseName());
+ }
+ return connection;
+ }
+}
diff --git a/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/converter/RedshiftConverterUtils.java b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/converter/RedshiftConverterUtils.java
new file mode 100644
index 00000000..f1930c50
--- /dev/null
+++ b/flink-connector-aws/flink-connector-redshift/src/main/java/org/apache/flink/connector/redshift/converter/RedshiftConverterUtils.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.redshift.converter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Converter utility between Flink Rich DataTypes and Redshift DataTypes. */
+@Internal
+public class RedshiftConverterUtils {
+ public static final int BOOL_TRUE = 1;
+
+ /**
+ * Converts Flink RichDatatype to Redshift DataType.
+ *
+ * @param value Associated Value.
+ * @param type flink LogicalType for the field.
+ * @return Datatype of Redshift.
+ */
+ public static Object toExternal(Object value, LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case INTERVAL_YEAR_MONTH:
+ case BIGINT:
+ case INTERVAL_DAY_TIME:
+ case FLOAT:
+ case DOUBLE:
+ case BINARY:
+ case VARBINARY:
+ return value;
+ case CHAR:
+ case VARCHAR:
+ return value.toString();
+ case DATE:
+ return Date.valueOf(LocalDate.ofEpochDay((Integer) value));
+ case TIME_WITHOUT_TIME_ZONE:
+ LocalTime localTime = LocalTime.ofNanoOfDay(((Integer) value) * 1_000_000L);
+ return toEpochDayOneTimestamp(localTime);
+ case TIMESTAMP_WITH_TIME_ZONE:
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ return ((TimestampData) value).toTimestamp();
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return Timestamp.from(((TimestampData) value).toInstant());
+ case DECIMAL:
+ return ((DecimalData) value).toBigDecimal();
+ case ARRAY:
+ LogicalType elementType =
+ ((ArrayType) type)
+ .getChildren().stream()
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Unknown array element type"));
+ ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType);
+ ArrayData arrayData = ((ArrayData) value);
+ Object[] objectArray = new Object[arrayData.size()];
+ for (int i = 0; i < arrayData.size(); i++) {
+ objectArray[i] =
+ toExternal(elementGetter.getElementOrNull(arrayData, i), elementType);
+ }
+ return objectArray;
+ case MAP:
+ LogicalType keyType = ((MapType) type).getKeyType();
+ LogicalType valueType = ((MapType) type).getValueType();
+ ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType);
+ ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType);
+ MapData mapData = (MapData) value;
+ ArrayData keyArrayData = mapData.keyArray();
+ ArrayData valueArrayData = mapData.valueArray();
+ Map