diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
index 47e8b0a52..94ea6004b 100644
--- a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
@@ -28,7 +28,9 @@ public class RedshiftConnectorUnitTest {
@Rule
public ExpectedException expectedEx = ExpectedException.none();
- private static final RedshiftConnector CONNECTOR = new RedshiftConnector(null);
+ private static final RedshiftConnector CONNECTOR = new RedshiftConnector(new RedshiftConnectorConfig(
+ "username", "password", "jdbc", "", "localhost",
+ "db", 5432));
/**
* Unit test for getTableName()
diff --git a/amazon-redshift-plugin/widgets/Redshift-batchsource.json b/amazon-redshift-plugin/widgets/Redshift-batchsource.json
index 943e2d24e..586a8993b 100644
--- a/amazon-redshift-plugin/widgets/Redshift-batchsource.json
+++ b/amazon-redshift-plugin/widgets/Redshift-batchsource.json
@@ -156,6 +156,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
@@ -228,7 +259,7 @@
"name": "connection"
}
]
- },
+ }
],
"jump-config": {
"datasets": [
diff --git a/amazon-redshift-plugin/widgets/Redshift-connector.json b/amazon-redshift-plugin/widgets/Redshift-connector.json
index 3a2af8e01..f392e3a78 100644
--- a/amazon-redshift-plugin/widgets/Redshift-connector.json
+++ b/amazon-redshift-plugin/widgets/Redshift-connector.json
@@ -69,6 +69,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": []
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-action.json b/aurora-mysql-plugin/widgets/AuroraMysql-action.json
index efc5f98ff..bd2bac558 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-action.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-action.json
@@ -90,6 +90,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json b/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
index a435e4e4f..6663be7ce 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
@@ -116,6 +116,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json b/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
index 50b435645..bd2bb88a9 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
@@ -135,6 +135,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json b/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json
index cc33cf0a1..64da4f1bc 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json
@@ -105,6 +105,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json
index 1f3bca862..e012f65eb 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json
@@ -79,6 +79,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
index 53979d6d4..bfc83bd4e 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
@@ -121,6 +121,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json
index 14b00b974..fc2503c67 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json
@@ -124,6 +124,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json
index 3fdb1a14b..8b328160d 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json
@@ -94,6 +94,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java
index 65a14502e..93c06981a 100644
--- a/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java
+++ b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java
@@ -20,6 +20,9 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class CloudSQLMySQLSinkTest {
@Test
public void testSetColumnsInfo() {
@@ -27,7 +30,13 @@ public void testSetColumnsInfo() {
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
- CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig());
+
+ CloudSQLMySQLSink.CloudSQLMySQLSinkConfig mockConfig = mock(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.class);
+ when(mockConfig.getInitialRetryDuration()).thenReturn(5); // or appropriate value
+ when(mockConfig.getMaxRetryDuration()).thenReturn(80); // or appropriate value
+ when(mockConfig.getMaxRetryCount()).thenReturn(5); // or appropriate value
+
+ CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(mockConfig);
Assert.assertNotNull(outputSchema.getFields());
cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns());
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json
index 66d6ebb85..0dd6f8f41 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json
@@ -112,6 +112,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"filters": [
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
index 89a7d7736..3a3277ed8 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
@@ -176,6 +176,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"filters": [
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
index 4ac7747f4..a90154670 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
@@ -175,6 +175,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json
index b5c2c9993..1cebc7850 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json
@@ -94,6 +94,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json
index eab240679..e14646154 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json
@@ -112,6 +112,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"filters": [
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json
index 2fda594dd..8d6578413 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json
@@ -192,6 +192,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json
index 96ea97ac2..ea449120d 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json
@@ -175,6 +175,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json
index 9824f91bd..36013ac40 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json
@@ -94,6 +94,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/database-commons/pom.xml b/database-commons/pom.xml
index 67dc8e82e..1d49676be 100644
--- a/database-commons/pom.xml
+++ b/database-commons/pom.xml
@@ -41,6 +41,12 @@
guava
+
+
+ dev.failsafe
+ failsafe
+
+
io.cdap.cdap
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
index c5320e25e..2a5aaa988 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
@@ -24,6 +24,7 @@
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.db.config.DatabaseConnectionConfig;
+import io.cdap.plugin.util.RetryUtils;
import java.util.Collections;
import java.util.HashMap;
@@ -72,6 +73,37 @@ public abstract class ConnectionConfig extends PluginConfig implements DatabaseC
@Macro
public String connectionArguments;
+ @Name(RetryUtils.NAME_INITIAL_RETRY_DURATION)
+ @Description("Time taken for the first retry. Default is 5 seconds.")
+ @Nullable
+ @Macro
+ private Integer initialRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_DURATION)
+ @Description("Maximum time in seconds retries can take. Default is 80 seconds.")
+ @Nullable
+ @Macro
+ private Integer maxRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_COUNT)
+ @Description("Maximum number of retries allowed. Default is 5.")
+ @Nullable
+ @Macro
+ private Integer maxRetryCount;
+
+
+ public Integer getInitialRetryDuration() {
+ return initialRetryDuration == null ? RetryUtils.DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration;
+ }
+
+ public Integer getMaxRetryDuration() {
+ return maxRetryDuration == null ? RetryUtils.DEFAULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
+ }
+
+ public Integer getMaxRetryCount() {
+ return maxRetryCount == null ? RetryUtils.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
+ }
+
public ConnectionConfig() {
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/RetryExceptions.java b/database-commons/src/main/java/io/cdap/plugin/db/RetryExceptions.java
new file mode 100644
index 000000000..e16531e33
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/db/RetryExceptions.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package io.cdap.plugin.db;
+
+import java.sql.SQLTransientException;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Checks whether the given exception or one of its causes is a known retryable SQLException.
+ */
+public class RetryExceptions {
+ public static boolean isRetryable(Throwable t) {
+ Set seen = new HashSet<>();
+ while (t != null && seen.add(t)) {
+ if (t instanceof SQLTransientException) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
+}
+
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java
index 5e22abf85..e20774dbb 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java
@@ -16,6 +16,7 @@
package io.cdap.plugin.db.action;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
@@ -25,10 +26,11 @@
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -41,9 +43,12 @@ public class AbstractDBArgumentSetter extends Action {
private static final String JDBC_PLUGIN_ID = "driver";
private final ArgumentSetterConfig config;
+ private final RetryPolicy> retryPolicy;
public AbstractDBArgumentSetter(ArgumentSetterConfig config) {
this.config = config;
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
+ config.getMaxRetryCount());
}
@Override
@@ -100,10 +105,22 @@ private void processArguments(Class extends Driver> driverClass,
Properties connectionProperties = new Properties();
connectionProperties.putAll(config.getConnectionArguments());
try {
- Connection connection = DriverManager
- .getConnection(config.getConnectionString(), connectionProperties);
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(config.getQuery());
+ executeWithRetry(failureCollector, settableArguments, connectionProperties);
+ } finally {
+ driverCleanup.destroy();
+ }
+ }
+
+ private void executeWithRetry(FailureCollector failureCollector, SettableArguments settableArguments,
+ Properties connectionProperties) throws SQLException {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ config.getConnectionString(), connectionProperties, getExternalDocumentationLink())) {
+ ResultSet resultSet;
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getExternalDocumentationLink())) {
+ resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ config.getQuery(), null);
+ }
boolean hasRecord = resultSet.next();
if (!hasRecord) {
failureCollector.addFailure("No record found.",
@@ -118,8 +135,6 @@ private void processArguments(Class extends Driver> driverClass,
.addFailure("More than one records found.",
"The argument selection conditions must match only one record.");
}
- } finally {
- driverCleanup.destroy();
}
}
@@ -138,4 +153,14 @@ private void setArguments(ResultSet resultSet, FailureCollector failureCollector
arguments.set(column, resultSet.getString(column));
}
}
+
+ /**
+ * Returns the external documentation link.
+ * Override this method to provide a custom external documentation link.
+ *
+ * @return external documentation link
+ */
+ protected String getExternalDocumentationLink() {
+ return "https://en.wikipedia.org/wiki/SQLSTATE";
+ }
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java b/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java
index e2ccfc57e..ab3392a9b 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java
@@ -16,8 +16,12 @@
package io.cdap.plugin.db.action;
+import dev.failsafe.Failsafe;
+import dev.failsafe.RetryPolicy;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import java.sql.Connection;
import java.sql.Driver;
@@ -34,6 +38,7 @@ public class DBRun {
private final QueryConfig config;
private final Class extends Driver> driverClass;
private boolean enableAutoCommit;
+ private final RetryPolicy> retryPolicy;
public DBRun(QueryConfig config, Class extends Driver> driverClass, Boolean enableAutocommit) {
this.config = config;
@@ -41,6 +46,8 @@ public DBRun(QueryConfig config, Class extends Driver> driverClass, Boolean en
if (enableAutocommit != null) {
this.enableAutoCommit = enableAutocommit;
}
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
+ config.getMaxRetryCount());
}
/**
@@ -55,13 +62,15 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
Properties connectionProperties = new Properties();
connectionProperties.putAll(config.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(config.getConnectionString(), connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ config.getConnectionString(), connectionProperties, null)) {
executeInitQueries(connection, config.getInitQueries());
if (!enableAutoCommit) {
connection.setAutoCommit(false);
}
- try (Statement statement = connection.createStatement()) {
- statement.execute(config.query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ null)) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, config.query, null);
if (!enableAutoCommit) {
connection.commit();
}
@@ -76,8 +85,9 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
private void executeInitQueries(Connection connection, List initQueries) throws SQLException {
for (String query : initQueries) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ null)) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, null);
}
}
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java
index 5b92a85f7..3d9ed16ff 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java
@@ -155,4 +155,16 @@ public Operation getOperationName() {
public String getRelationTableKey() {
return relationTableKey;
}
+
+ public Integer getInitialRetryDuration() {
+ return getConnection().getInitialRetryDuration();
+ }
+
+ public Integer getMaxRetryDuration() {
+ return getConnection().getMaxRetryDuration();
+ }
+
+ public Integer getMaxRetryCount() {
+ return getConnection().getMaxRetryCount();
+ }
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
index 41c577397..f15939ab7 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
@@ -268,4 +268,16 @@ public Integer getFetchSize() {
return fetchSize;
}
+ public Integer getInitialRetryDuration() {
+ return getConnection().getInitialRetryDuration();
+ }
+
+ public Integer getMaxRetryDuration() {
+ return getConnection().getMaxRetryDuration();
+ }
+
+ public Integer getMaxRetryCount() {
+ return getConnection().getMaxRetryCount();
+ }
+
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java
index 55cfe363f..e1fde69a1 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java
@@ -50,4 +50,10 @@ public interface DatabaseConnectionConfig {
*/
String getPassword();
+ Integer getInitialRetryDuration();
+
+ Integer getMaxRetryDuration();
+
+ Integer getMaxRetryCount();
+
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java
index 4bee056f8..646c5e388 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java
@@ -25,6 +25,7 @@
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.common.db.DBConnectorProperties;
import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.util.RetryUtils;
import java.util.Collections;
import java.util.HashMap;
@@ -63,6 +64,26 @@ public abstract class AbstractDBConnectorConfig extends PluginConfig implements
@Macro
protected String connectionArguments;
+
+ @Name(RetryUtils.NAME_INITIAL_RETRY_DURATION)
+ @Description("Time taken for the first retry. Default is 5 seconds.")
+ @Nullable
+ @Macro
+ private Integer initialRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_DURATION)
+ @Description("Maximum time in seconds retries can take. Default is 80 seconds.")
+ @Nullable
+ @Macro
+ private Integer maxRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_COUNT)
+ @Description("Maximum number of retries allowed. Default is 5.")
+ @Nullable
+ @Macro
+ private Integer maxRetryCount;
+
+
@Nullable
@Override
public String getUser() {
@@ -74,6 +95,18 @@ public String getUser() {
public String getPassword() {
return password;
}
+
+ public Integer getInitialRetryDuration() {
+ return initialRetryDuration == null ? RetryUtils.DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration;
+ }
+
+ public Integer getMaxRetryDuration() {
+ return maxRetryDuration == null ? RetryUtils.DEFAULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
+ }
+
+ public Integer getMaxRetryCount() {
+ return maxRetryCount == null ? RetryUtils.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
+ }
@Override
public Properties getConnectionArgumentsProperties() {
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java
index 8a9b7b6e4..973574db0 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java
@@ -17,6 +17,7 @@
package io.cdap.plugin.db.connector;
import com.google.common.collect.Maps;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.batch.BatchConnector;
@@ -33,6 +34,8 @@
import io.cdap.plugin.db.ConnectionConfigAccessor;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.source.DataDrivenETLDBInputFormat;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -56,10 +59,13 @@ public abstract class AbstractDBSpecificConnector extends
implements BatchConnector {
private final AbstractDBConnectorConfig config;
+ private final RetryPolicy> retryPolicy;
protected AbstractDBSpecificConnector(AbstractDBConnectorConfig config) {
super(config);
this.config = config;
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
+ config.getMaxRetryCount());
}
public abstract boolean supportSchema();
@@ -172,13 +178,14 @@ protected String getStratifiedQuery(String tableName, int limit, String strata,
protected Schema loadTableSchema(Connection connection, String query, @Nullable Integer timeoutSec, String sessionID)
throws SQLException {
- Statement statement = connection.createStatement();
- statement.setMaxRows(1);
- if (timeoutSec != null) {
- statement.setQueryTimeout(timeoutSec);
- }
- ResultSet resultSet = statement.executeQuery(query);
- return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
+ Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection, null);
+ statement.setMaxRows(1);
+ if (timeoutSec != null) {
+ statement.setQueryTimeout(timeoutSec);
+ }
+ ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement, query,
+ null);
+ return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
}
protected void setConnectionProperties(Map properties, ConnectorSpecRequest request) {
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
index 0bb4bf123..0f10fe579 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
@@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -25,10 +26,6 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
-import io.cdap.cdap.api.exception.ErrorCategory;
-import io.cdap.cdap.api.exception.ErrorCodeType;
-import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
@@ -53,6 +50,8 @@
import io.cdap.plugin.db.config.DatabaseSinkConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
@@ -61,7 +60,6 @@
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -98,12 +96,15 @@ public abstract class AbstractDBSink retryPolicy;
public AbstractDBSink(T dbSinkConfig) {
super(new ReferencePluginConfig(dbSinkConfig.getReferenceName()));
this.dbSinkConfig = dbSinkConfig;
this.configAccessor = new ConnectionConfigAccessor();
this.configuration = configAccessor.getConfiguration();
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(dbSinkConfig.getInitialRetryDuration(),
+ dbSinkConfig.getMaxRetryDuration(), dbSinkConfig.getMaxRetryCount());
}
private String getJDBCPluginId() {
@@ -126,7 +127,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
Class extends Driver> driverClass = DBUtils.getDriverClass(
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
if (driverClass != null && dbSinkConfig.canConnect()) {
- validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema, dbSinkConfig.getDBSchemaName());
+ validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema,
+ dbSinkConfig.getDBSchemaName());
}
}
public void validateOperations(FailureCollector collector, T dbSinkConfig, @Nullable Schema inputSchema) {
@@ -185,7 +187,7 @@ protected String getErrorDetailsProviderClassName() {
* @return external documentation link
*/
protected String getExternalDocumentationLink() {
- return null;
+ return "https://en.wikipedia.org/wiki/SQLSTATE";
}
@Override
@@ -302,33 +304,16 @@ private Schema inferSchema(Class extends Driver> driverClass) {
dbSinkConfig.getJdbcPluginName());
Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(dbSinkConfig.getConnectionString(),
- connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ dbSinkConfig.getConnectionString(), connectionProperties, getExternalDocumentationLink())) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
-
- try (Statement statement = connection.createStatement();
- ResultSet rs = statement.executeQuery("SELECT * FROM " + fullyQualifiedTableName
- + " WHERE 1 = 0")) {
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy,
+ connection, getExternalDocumentationLink());
+ ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ String.format("SELECT * FROM %s WHERE 1 = 0", fullyQualifiedTableName),
+ getExternalDocumentationLink())) {
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
}
- } catch (SQLException e) {
- // wrap exception to ensure SQLException-child instances not exposed to contexts w/o jdbc driver in classpath
- String errorMessage =
- String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
- e.getSQLState(), e.getErrorCode());
- String errorMessageWithDetails = String.format("Error while reading table metadata." +
- "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
- String externalDocumentationLink = getExternalDocumentationLink();
- if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
- if (!errorMessage.endsWith(".")) {
- errorMessage = errorMessage + ".";
- }
- errorMessage = String.format("%s For more details, see %s", errorMessageWithDetails, errorMessage);
- }
- throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
- e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(),
- e.getSQLState(), e.getErrorCode()));
}
} catch (IllegalAccessException | InstantiationException | SQLException e) {
throw new InvalidStageException("JDBC Driver unavailable: " + dbSinkConfig.getJdbcPluginName(), e);
@@ -357,7 +342,7 @@ public void destroy() {
}
}
- private void setResultSetMetadata() throws Exception {
+ private void setResultSetMetadata() throws SQLException, IllegalAccessException, InstantiationException {
List columnTypes = new ArrayList<>(columns.size());
String connectionString = dbSinkConfig.getConnectionString();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
@@ -369,14 +354,16 @@ private void setResultSetMetadata() throws Exception {
Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getExternalDocumentationLink())) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
- try (Statement statement = connection.createStatement();
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getExternalDocumentationLink());
// Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata
// that can be used to construct DBRecord objects to sink to the database table.
- ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0",
- dbColumns, fullyQualifiedTableName))
- ) {
+ ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
+ getExternalDocumentationLink())) {
columnTypes.addAll(getMatchedColumnTypeList(rs, columns));
}
}
@@ -438,23 +425,24 @@ private void validateSchema(FailureCollector collector, Class extends Driver>
Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getExternalDocumentationLink())) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
try (ResultSet tables = connection.getMetaData().getTables(null, dbSchemaName, tableName, null)) {
if (!tables.next()) {
- collector.addFailure(
- String.format("Table '%s' does not exist.", tableName),
- String.format("Ensure table '%s' is set correctly and that the connection string '%s' " +
- "points to a valid database.", fullyQualifiedTableName, connectionString))
+ collector.addFailure(String.format("Table '%s' does not exist.", tableName),
+ String.format("Ensure table '%s' is set correctly and that the connection string '%s' " +
+ "points to a valid database.", fullyQualifiedTableName, connectionString))
.withConfigProperty(DBSinkConfig.TABLE_NAME);
return;
}
}
setColumnsInfo(inputSchema.getFields());
- try (PreparedStatement pStmt = connection.prepareStatement(String.format("SELECT %s FROM %s WHERE 1 = 0",
- dbColumns,
- fullyQualifiedTableName));
- ResultSet rs = pStmt.executeQuery()) {
+ try (PreparedStatement pStmt = RetryUtils.prepareStatementWithRetry((RetryPolicy) retryPolicy,
+ connection, String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
+ getExternalDocumentationLink());
+ ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, pStmt,
+ getExternalDocumentationLink())) {
getFieldsValidator().validateFields(inputSchema, rs, collector);
}
} catch (SQLException e) {
@@ -486,8 +474,9 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
private void executeInitQueries(Connection connection, List initQueries) throws SQLException {
for (String query : initQueries) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getExternalDocumentationLink())) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getExternalDocumentationLink());
}
}
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
index 54d1e2ab6..c964b8fdb 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
@@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -25,10 +26,6 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
-import io.cdap.cdap.api.exception.ErrorCategory;
-import io.cdap.cdap.api.exception.ErrorCodeType;
-import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
@@ -52,6 +49,8 @@
import io.cdap.plugin.db.config.DatabaseSourceConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -62,7 +61,6 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -87,6 +85,7 @@ public abstract class AbstractDBSource retryPolicy;
protected final T sourceConfig;
protected Class extends Driver> driverClass;
@@ -94,6 +93,8 @@ public abstract class AbstractDBSource driverClass) throws IllegalAcces
SQLException, InstantiationException {
DriverCleanup driverCleanup;
try {
-
driverCleanup = loadPluginClassAndGetDriver(driverClass);
try {
return getSchema();
@@ -168,13 +168,14 @@ public Schema getSchema() throws SQLException {
}
private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException {
- Statement statement = connection.createStatement();
- statement.setMaxRows(1);
- if (query.contains("$CONDITIONS")) {
- query = removeConditionsClause(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getExternalDocumentationLink())) {
+ statement.setMaxRows(1);
+ String finalQuery = query.contains("$CONDITIONS") ? removeConditionsClause(query) : query;
+ ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ finalQuery, getExternalDocumentationLink());
+ return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
}
- ResultSet resultSet = statement.executeQuery(query);
- return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
}
@VisibleForTesting
@@ -194,28 +195,10 @@ private Schema loadSchemaFromDB(Class extends Driver> driverClass)
Properties connectionProperties = new Properties();
connectionProperties.putAll(sourceConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getExternalDocumentationLink())) {
executeInitQueries(connection, sourceConfig.getInitQueries());
return loadSchemaFromDB(connection, sourceConfig.getImportQuery());
-
- } catch (SQLException e) {
- // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath
- String errorMessage =
- String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
- e.getSQLState(), e.getErrorCode());
- String errorMessageWithDetails = String.format("Error occurred while trying to get schema from database." +
- "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
- String externalDocumentationLink = getExternalDocumentationLink();
- if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
- if (!errorMessage.endsWith(".")) {
- errorMessage = errorMessage + ".";
- }
- errorMessage = String.format("%s For more details, see %s", errorMessage, externalDocumentationLink);
- }
- throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
- e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(),
- e.getSQLState(), e.getErrorCode()));
} finally {
driverCleanup.destroy();
}
@@ -223,8 +206,9 @@ private Schema loadSchemaFromDB(Class extends Driver> driverClass)
private void executeInitQueries(Connection connection, List initQueries) throws SQLException {
for (String query : initQueries) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getExternalDocumentationLink())) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getExternalDocumentationLink());
}
}
}
@@ -262,11 +246,12 @@ private DriverCleanup loadPluginClassAndGetDriver(Class extends Driver> driver
}
}
- private Connection getConnection() throws SQLException {
+ private Connection getConnection() {
String connectionString = createConnectionString();
Properties connectionProperties = new Properties();
connectionProperties.putAll(sourceConfig.getConnectionArguments());
- return DriverManager.getConnection(connectionString, connectionProperties);
+ return RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getExternalDocumentationLink());
}
@Override
@@ -383,7 +368,7 @@ protected Class extends DBWritable> getDBRecordType() {
* @return external documentation link
*/
protected String getExternalDocumentationLink() {
- return null;
+ return "https://en.wikipedia.org/wiki/SQLSTATE";
}
@Override
diff --git a/database-commons/src/main/java/io/cdap/plugin/util/RetryPolicyUtil.java b/database-commons/src/main/java/io/cdap/plugin/util/RetryPolicyUtil.java
new file mode 100644
index 000000000..91d6bd6ba
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/util/RetryPolicyUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package io.cdap.plugin.util;
+
+import dev.failsafe.RetryPolicy;
+import io.cdap.cdap.api.Config;
+import io.cdap.plugin.db.RetryExceptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * Utility class for creating standardized {@link dev.failsafe.RetryPolicy} configurations
+ * to handle transient SQL exceptions using the Failsafe library.
+ */
+public class RetryPolicyUtil extends Config {
+ public static final Logger LOG = LoggerFactory.getLogger(RetryPolicyUtil.class);
+
+ /**
+ * Create a RetryPolicy using custom config values.
+ */
+ public static RetryPolicy getRetryPolicy(Integer initialRetryDuration,
+ Integer maxRetryDuration, Integer maxRetryCount) {
+ return RetryPolicy.builder()
+ .handleIf((failure) -> RetryExceptions.isRetryable(failure))
+ .withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration))
+ .withMaxRetries(maxRetryCount)
+ .onRetry(e -> LOG.debug("Retrying... Attempt {}",
+ e.getAttemptCount()))
+ .onFailedAttempt(e -> LOG.debug("Failed Attempt : {}", e.getLastException()))
+ .onFailure(e -> LOG.debug("Failed after retries." +
+ " Reason: {}",
+ e.getException() != null ? e.getException().getMessage() : "Unknown error"))
+ .build();
+ }
+}
diff --git a/database-commons/src/main/java/io/cdap/plugin/util/RetryUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/RetryUtils.java
new file mode 100644
index 000000000..4a1b6902f
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/util/RetryUtils.java
@@ -0,0 +1,309 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.util;
+
+import com.google.common.base.Strings;
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
+import io.cdap.cdap.api.exception.ErrorCategory;
+import io.cdap.cdap.api.exception.ErrorCodeType;
+import io.cdap.cdap.api.exception.ErrorType;
+import io.cdap.cdap.api.exception.ErrorUtils;
+import io.cdap.cdap.api.exception.ProgramFailureException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utility class for retrieving common methods using {@link dev.failsafe.RetryPolicy}
+ */
+public final class RetryUtils {
+
+ private static final Map ERROR_CODE_TO_ERROR_TYPE;
+ private static final Map ERROR_CODE_TO_ERROR_CATEGORY;
+ public static final String NAME_INITIAL_RETRY_DURATION = "initialRetryDuration";
+ public static final String NAME_MAX_RETRY_DURATION = "maxRetryDuration";
+ public static final String NAME_MAX_RETRY_COUNT = "maxRetryCount";
+ public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5;
+ public static final int DEFAULT_MAX_RETRY_COUNT = 5;
+ public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80;
+
+ static {
+ // https://en.wikipedia.org/wiki/SQLSTATE
+ ERROR_CODE_TO_ERROR_TYPE = new HashMap<>();
+ ERROR_CODE_TO_ERROR_TYPE.put("01", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("02", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("07", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("08", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("09", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0A", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("0D", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0E", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0F", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0K", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0L", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0M", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0N", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0P", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0S", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0T", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0U", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0V", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0W", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0X", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0Y", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("0Z", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("10", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("20", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("21", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("22", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("23", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("24", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("25", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("26", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("27", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("28", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("2B", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("2C", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("2D", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("2E", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("2F", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("2H", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("30", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("33", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("34", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("35", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("36", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("38", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("39", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("3B", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("3C", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("3D", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("3F", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("40", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("42", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("44", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("45", ErrorType.USER);
+ ERROR_CODE_TO_ERROR_TYPE.put("46", ErrorType.SYSTEM);
+ ERROR_CODE_TO_ERROR_TYPE.put("HW", ErrorType.SYSTEM);
+
+ ERROR_CODE_TO_ERROR_CATEGORY = new HashMap<>();
+ ErrorCategory.ErrorCategoryEnum plugin = ErrorCategory.ErrorCategoryEnum.PLUGIN;
+ ERROR_CODE_TO_ERROR_CATEGORY.put("01", new ErrorCategory(plugin, "DB Warning"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("02", new ErrorCategory(plugin, "DB No Data"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("07", new ErrorCategory(plugin, "DB Dynamic SQL error"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("08", new ErrorCategory(plugin, "DB Connection Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("09", new ErrorCategory(plugin, "DB Triggered Action Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0A", new ErrorCategory(plugin, "DB Feature Not Supported"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0D", new ErrorCategory(plugin, "DB Invalid Target Type Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0E", new ErrorCategory(plugin, "DB Invalid Schema Name List Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0F", new ErrorCategory(plugin, "DB Locator Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0K", new ErrorCategory(plugin, "DB Resignal When Handler Not Active"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0L", new ErrorCategory(plugin, "DB Invalid Grantor"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0M", new ErrorCategory(plugin, "DB Invalid SQL-Invoked Procedure Reference"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0N", new ErrorCategory(plugin, "DB SQL/XML Mapping Error"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0P", new ErrorCategory(plugin, "DB Invalid Role Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0S",
+ new ErrorCategory(plugin, "DB Invalid Transform Group Name Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0T",
+ new ErrorCategory(plugin, "DB Target Table Disagrees With Cursor Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0U",
+ new ErrorCategory(plugin, "DB Attempt To Assign To Non-Updatable Column"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0V", new ErrorCategory(plugin, "DB Attempt To Assign To Ordering Column"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0W", new ErrorCategory(plugin, "DB Prohibited Statement Encountered"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0X", new ErrorCategory(plugin, "DB Invalid Foreign Server Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0Y", new ErrorCategory(plugin, "DB Pass-Through Specific Condition"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("0Z", new ErrorCategory(plugin, "DB Diagnostics Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("10", new ErrorCategory(plugin, "DB XQuery Error"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("20", new ErrorCategory(plugin, "DB Case Not Found For Case Statement"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("21", new ErrorCategory(plugin, "DB Cardinality Violation"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("22", new ErrorCategory(plugin, "DB Data Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("23", new ErrorCategory(plugin, "DB Integrity Constraint Violation"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("24", new ErrorCategory(plugin, "DB Invalid Cursor State"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("25", new ErrorCategory(plugin, "DB Invalid Transaction State"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("26", new ErrorCategory(plugin, "DB Invalid SQL Statement Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("27", new ErrorCategory(plugin, "DB Triggered Data Change Violation"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("28", new ErrorCategory(plugin, "DB Invalid Authorization Specification"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("2B",
+ new ErrorCategory(plugin, "DB Dependent Privilege Descriptors Still Exist"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("2C", new ErrorCategory(plugin, "DB Invalid Character Set Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("2D", new ErrorCategory(plugin, "DB Invalid Transaction Termination"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("2E", new ErrorCategory(plugin, "DB Invalid Connection Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("2F", new ErrorCategory(plugin, "DB SQL Routine Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("2H", new ErrorCategory(plugin, "DB Invalid Collation Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("30", new ErrorCategory(plugin, "DB Invalid SQL Statement Identifier"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("33", new ErrorCategory(plugin, "DB Invalid SQL Descriptor Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("34", new ErrorCategory(plugin, "DB Invalid Cursor Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("35", new ErrorCategory(plugin, "DB Invalid Condition Number"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("36", new ErrorCategory(plugin, "DB Cursor Sensitivity Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("38", new ErrorCategory(plugin, "DB External Routine Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("39", new ErrorCategory(plugin, "DB External Routine Invocation Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("3B", new ErrorCategory(plugin, "DB Savepoint Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("3C", new ErrorCategory(plugin, "DB Ambiguous Cursor Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("3D", new ErrorCategory(plugin, "DB Invalid Catalog Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("3F", new ErrorCategory(plugin, "DB Invalid Schema Name"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("40", new ErrorCategory(plugin, "DB Transaction Rollback"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("42", new ErrorCategory(plugin, "DB Syntax Error or Access Rule Violation"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("44", new ErrorCategory(plugin, "DB With Check Option Violation"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("45", new ErrorCategory(plugin, "DB Unhandled User-Defined Exception"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("46", new ErrorCategory(plugin, "DB JAVA DDL"));
+ ERROR_CODE_TO_ERROR_CATEGORY.put("HW", new ErrorCategory(plugin, "DB Datalink Exception"));
+ }
+
+ public static Connection createConnectionWithRetry(RetryPolicy retryPolicy, String connectionString,
+ Properties connectionProperties, String externalDocumentationLink) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> DriverManager
+ .getConnection(connectionString, connectionProperties)
+ );
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, externalDocumentationLink);
+ }
+ }
+
+ public static Statement createStatementWithRetry(RetryPolicy retryPolicy, Connection connection,
+ String externalDocumentationLink) {
+ try {
+ return Failsafe.with(retryPolicy).get(connection::createStatement);
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, externalDocumentationLink);
+ }
+ }
+
+ public static PreparedStatement prepareStatementWithRetry(RetryPolicy retryPolicy,
+ Connection connection, String sqlQuery, String externalDocumentationLink) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> connection.prepareStatement(sqlQuery));
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, externalDocumentationLink);
+ }
+ }
+
+ public static ResultSet executeQueryWithRetry(RetryPolicy retryPolicy,
+ PreparedStatement preparedStatement, String externalDocumentationLink) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> preparedStatement.executeQuery());
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, externalDocumentationLink);
+ }
+ }
+
+ public static ResultSet executeQueryWithRetry(RetryPolicy retryPolicy, Statement statement,
+ String query, String externalDocumentationLink) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> statement.executeQuery(query));
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, externalDocumentationLink);
+ }
+ }
+
+ public static void executeInitQueryWithRetry(RetryPolicy> retryPolicy, Statement statement, String query,
+ String externalDocumentationLink) {
+ try {
+ Failsafe.with(retryPolicy).run(() -> statement.execute(query));
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, externalDocumentationLink);
+ }
+ }
+
+ private static RuntimeException unwrapFailsafeException(Exception e, String externalDocumentationLink) {
+ if (e instanceof FailsafeException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ return programFailureException((SQLException) cause, externalDocumentationLink);
+ } else if (cause instanceof RuntimeException) {
+ return (RuntimeException) cause;
+ } else if (cause instanceof Error) {
+ return new RuntimeException("Failsafe wrapped an Error", cause);
+ } else {
+ return new RuntimeException("Failsafe wrapped a non-runtime exception", cause);
+ }
+ }
+ if (e instanceof SQLException) {
+ return programFailureException((SQLException) e, externalDocumentationLink);
+ }
+ if (e instanceof RuntimeException) {
+ return (RuntimeException) e;
+ }
+ return new RuntimeException("Unexpected checked exception", e);
+ }
+
+ private static ProgramFailureException programFailureException(SQLException e, String externalDocumentationLink) {
+ // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc
+ // driver in classpath
+ String errorMessage =
+ String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
+ e.getSQLState(), e.getErrorCode());
+ String sqlState = e.getSQLState();
+ int errorCode = e.getErrorCode();
+ String errorMessageWithDetails = String.format("Error occurred while trying to" +
+ " get schema from database." + "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(),
+ errorCode, sqlState);
+
+ if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
+ if (!errorMessage.endsWith(".")) {
+ errorMessage = errorMessage + ".";
+ }
+ errorMessage = String.format("%s For more details, see %s", errorMessage, externalDocumentationLink);
+ }
+ return ErrorUtils.getProgramFailureException(Strings.isNullOrEmpty(sqlState) ?
+ new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN) : getErrorCategoryFromSqlState(sqlState),
+ errorMessage, errorMessageWithDetails, getErrorTypeFromErrorCodeAndSqlState(errorCode, sqlState), true,
+ ErrorCodeType.SQLSTATE, e.getSQLState(), externalDocumentationLink, e);
+ }
+
+ /**
+ * Get the {@link ErrorCategory} for the given SQL state.
+ * Implements generic error categories based on the SQL state.
+ * See SQLSTATE for more information.
+ * Override this method to provide custom error categories based on the SQL state.
+ *
+ * @param sqlState The SQL state.
+ * @return The {@link ErrorCategory} for the given SQL state.
+ */
+ private static ErrorCategory getErrorCategoryFromSqlState(String sqlState) {
+ if (!Strings.isNullOrEmpty(sqlState) && sqlState.length() >= 2 &&
+ ERROR_CODE_TO_ERROR_CATEGORY.containsKey(sqlState.substring(0, 2))) {
+ return ERROR_CODE_TO_ERROR_CATEGORY.get(sqlState.substring(0, 2));
+ }
+ return new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN);
+ }
+
+ /**
+ * Get the {@link ErrorType} for the given error code and SQL state.
+ * Override this method to provide custom error types based on the error code and SQL state.
+ *
+ * @param errorCode The error code.
+ * @param sqlState The SQL state.
+ * @return The {@link ErrorType} for the given error code and SQL state.
+ */
+ private static ErrorType getErrorTypeFromErrorCodeAndSqlState(int errorCode, String sqlState) {
+ if (!Strings.isNullOrEmpty(sqlState) && sqlState.length() >= 2 &&
+ ERROR_CODE_TO_ERROR_TYPE.containsKey(sqlState.substring(0, 2))) {
+ return ERROR_CODE_TO_ERROR_TYPE.get(sqlState.substring(0, 2));
+ }
+ return ErrorType.UNKNOWN;
+ }
+}
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/RetryPolicyUtilTest.java b/database-commons/src/test/java/io/cdap/plugin/db/RetryPolicyUtilTest.java
new file mode 100644
index 000000000..fa330de4e
--- /dev/null
+++ b/database-commons/src/test/java/io/cdap/plugin/db/RetryPolicyUtilTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
+import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.SQLSyntaxErrorException;
+import java.sql.SQLTransientConnectionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RetryPolicyUtilTest {
+
+ private AbstractDBConnectorConfig mockConfig;
+
+ @Before
+ public void setup() {
+ mockConfig = mock(AbstractDBConnectorConfig.class);
+ when(mockConfig.getInitialRetryDuration()).thenReturn(5);
+ when(mockConfig.getMaxRetryDuration()).thenReturn(10);
+ when(mockConfig.getMaxRetryCount()).thenReturn(2);
+ }
+
+ @Test
+ public void testCreateConnectionRetryPolicy_Retryable() {
+ RetryPolicy
+
+
+ dev.failsafe
+ failsafe
+ 3.3.2
+
org.hsqldb
hsqldb
diff --git a/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresConnectorUnitTest.java b/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresConnectorUnitTest.java
index 4f1f53964..f286cf574 100644
--- a/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresConnectorUnitTest.java
+++ b/postgresql-plugin/src/test/java/io/cdap/plugin/postgres/PostgresConnectorUnitTest.java
@@ -28,7 +28,9 @@ public class PostgresConnectorUnitTest {
@Rule
public ExpectedException expectedEx = ExpectedException.none();
- private static final PostgresConnector CONNECTOR = new PostgresConnector(null);
+ private static final PostgresConnector CONNECTOR = new PostgresConnector(new PostgresConnectorConfig(
+ "localhost", 5432, "user", "password", "postgresql",
+ ""));
/**
* Unit test for getTableName()
diff --git a/postgresql-plugin/widgets/PostgreSQL-connector.json b/postgresql-plugin/widgets/PostgreSQL-connector.json
index 9a7a02e14..88a1714fa 100644
--- a/postgresql-plugin/widgets/PostgreSQL-connector.json
+++ b/postgresql-plugin/widgets/PostgreSQL-connector.json
@@ -82,6 +82,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": []
diff --git a/postgresql-plugin/widgets/Postgres-action.json b/postgresql-plugin/widgets/Postgres-action.json
index 351c023f1..afed87295 100644
--- a/postgresql-plugin/widgets/Postgres-action.json
+++ b/postgresql-plugin/widgets/Postgres-action.json
@@ -82,6 +82,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/postgresql-plugin/widgets/Postgres-batchsink.json b/postgresql-plugin/widgets/Postgres-batchsink.json
index 14e6f8154..f58cb1995 100644
--- a/postgresql-plugin/widgets/Postgres-batchsink.json
+++ b/postgresql-plugin/widgets/Postgres-batchsink.json
@@ -169,6 +169,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/postgresql-plugin/widgets/Postgres-batchsource.json b/postgresql-plugin/widgets/Postgres-batchsource.json
index 60de4725f..6d0656e39 100644
--- a/postgresql-plugin/widgets/Postgres-batchsource.json
+++ b/postgresql-plugin/widgets/Postgres-batchsource.json
@@ -172,6 +172,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/postgresql-plugin/widgets/Postgres-postaction.json b/postgresql-plugin/widgets/Postgres-postaction.json
index 5a0daf595..6b3ebe1f3 100644
--- a/postgresql-plugin/widgets/Postgres-postaction.json
+++ b/postgresql-plugin/widgets/Postgres-postaction.json
@@ -97,6 +97,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/saphana-plugin/widgets/SapHana-action.json b/saphana-plugin/widgets/SapHana-action.json
index 7e60ac35d..5fb59faab 100644
--- a/saphana-plugin/widgets/SapHana-action.json
+++ b/saphana-plugin/widgets/SapHana-action.json
@@ -82,6 +82,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/saphana-plugin/widgets/SapHana-batchsink.json b/saphana-plugin/widgets/SapHana-batchsink.json
index a9d8c6343..56958358f 100644
--- a/saphana-plugin/widgets/SapHana-batchsink.json
+++ b/saphana-plugin/widgets/SapHana-batchsink.json
@@ -103,6 +103,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/saphana-plugin/widgets/SapHana-batchsource.json b/saphana-plugin/widgets/SapHana-batchsource.json
index 9352b02f7..7df341a7e 100644
--- a/saphana-plugin/widgets/SapHana-batchsource.json
+++ b/saphana-plugin/widgets/SapHana-batchsource.json
@@ -127,6 +127,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/saphana-plugin/widgets/SapHana-postaction.json b/saphana-plugin/widgets/SapHana-postaction.json
index ad2c8b938..e260ebf0e 100644
--- a/saphana-plugin/widgets/SapHana-postaction.json
+++ b/saphana-plugin/widgets/SapHana-postaction.json
@@ -97,6 +97,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/teradata-plugin/widgets/Teradata-action.json b/teradata-plugin/widgets/Teradata-action.json
index 2ffba361c..0662ed778 100644
--- a/teradata-plugin/widgets/Teradata-action.json
+++ b/teradata-plugin/widgets/Teradata-action.json
@@ -74,6 +74,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/teradata-plugin/widgets/Teradata-batchsink.json b/teradata-plugin/widgets/Teradata-batchsink.json
index f455991d4..861bbbaa2 100644
--- a/teradata-plugin/widgets/Teradata-batchsink.json
+++ b/teradata-plugin/widgets/Teradata-batchsink.json
@@ -95,6 +95,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/teradata-plugin/widgets/Teradata-batchsource.json b/teradata-plugin/widgets/Teradata-batchsource.json
index 94f5314e5..2d10020a5 100644
--- a/teradata-plugin/widgets/Teradata-batchsource.json
+++ b/teradata-plugin/widgets/Teradata-batchsource.json
@@ -115,6 +115,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/teradata-plugin/widgets/Teradata-postaction.json b/teradata-plugin/widgets/Teradata-postaction.json
index 35ead0013..deeccbb69 100644
--- a/teradata-plugin/widgets/Teradata-postaction.json
+++ b/teradata-plugin/widgets/Teradata-postaction.json
@@ -90,6 +90,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}