diff --git a/jethro-plugin/docs/JethroAction-action.md b/jethro-plugin/docs/JethroAction-action.md new file mode 100644 index 000000000..299517c97 --- /dev/null +++ b/jethro-plugin/docs/JethroAction-action.md @@ -0,0 +1,42 @@ +# Jethro Action + + +Description +----------- +Action that runs a Jethro command. + + +Use Case +-------- +The action can be used whenever you want to run a Jethro command before a data pipeline. +For example, you may want to run a sql command on a database before the pipeline source pulls data from tables. + + +Properties +---------- +**Driver Name:** Name of the JDBC driver to use. + +**Host:** Host that Jethro is running on. + +**Port:** Port that Jethro is running on. + +**Instance:** Jethro database name. + +**Database Command:** Database command to execute. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +Example +------- +Suppose you want to execute a query against a Jethro database instance named "prod" that is running on "localhost" +port 9112, then configure the plugin with: + +``` +Driver Name: "jethro" +Host: "localhost" +Port: 9112 +Database: "prod" +Database Command: "TRUNCATE TABLE testTable" +``` \ No newline at end of file diff --git a/jethro-plugin/docs/JethroPostAction-postaction.md b/jethro-plugin/docs/JethroPostAction-postaction.md new file mode 100644 index 000000000..b42891e24 --- /dev/null +++ b/jethro-plugin/docs/JethroPostAction-postaction.md @@ -0,0 +1,51 @@ +# Jethro Action + + +Description +----------- +Runs a Jethro query at the end of the pipeline run. +Can be configured to run only on success, only on failure, or always at the end of the run. + + +Use Case +-------- +The action is used whenever you need to run a query at the end of a pipeline run. +For example, you may have a pipeline that imports data from a database table to +hdfs files. At the end of the run, you may want to run a query that deletes the data +that was read from the table. + + +Properties +---------- +**Run Condition:** When to run the action. Must be 'completion', 'success', or 'failure'. Defaults to 'success'. +If set to 'completion', the action will be executed regardless of whether the pipeline run succeeded or failed. +If set to 'success', the action will only be executed if the pipeline run succeeded. +If set to 'failure', the action will only be executed if the pipeline run failed. + +**Driver Name:** Name of the JDBC driver to use. + +**Host:** Host that Jethro is running on. + +**Port:** Port that Jethro is running on. + +**Database:** MemSQL database name. + +**Query:** Query to run. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +Example +------- +Suppose you want to execute a query against a Jethro database instance named "prod" that is running on "localhost" +port 9112, then configure the plugin with: + +``` +Run Condition: "success" +Driver Name: "jethro" +Host: "localhost" +Port: 9112 +Database: "prod" +Query: "TRUNCATE TABLE testTable" +``` \ No newline at end of file diff --git a/jethro-plugin/docs/JethroSource-batchsource.md b/jethro-plugin/docs/JethroSource-batchsource.md new file mode 100644 index 000000000..c6e568892 --- /dev/null +++ b/jethro-plugin/docs/JethroSource-batchsource.md @@ -0,0 +1,60 @@ +# Jethro Batch Source + + +Description +----------- +Reads from a Jethro Data instance using a configurable SQL query. +Outputs one record for each row returned by the query. + + +Use Case +-------- +The source is used whenever you need to read from a Jethro Data instance. For example, you may want +to create daily snapshots of a database table by using this source and writing to +a TimePartitionedFileSet. + + +Properties +---------- +**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. + +**Driver Name:** Name of the JDBC driver to use. + +**Host:** Host that Jethro Data is running on. + +**Port:** Port that Jethro Data is running on. + +**Instance:** Jethro Data instance name. + +**Import Query:** The SELECT query to use to import data from the specified table. +You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should +contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'. +The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. +The '$CONDITIONS' string is not required if numSplits is set to one. + +**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field. +For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. + +**Username:** User identity for connecting to the specified database. + +**Password:** Password to use to connect to the specified database. + +**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one. + +**Number of Splits to Generate:** Number of splits to generate. + +**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes +back from the query. However, it must match the schema that comes back from the query, +except it can mark fields as nullable and can contain a subset of the fields. + +Data Types Mapping +---------- + + | Jethro Data Type | CDAP Schema Data Type | Comment | + | ------------------------------ | --------------------- | -------------------------------------------------- | + | INTEGER | int | | + | BIGINT | long | | + | FLOAT | float | | + | DOUBLE | double | | + | STRING | string | | + | TIMESTAMP | timestamp | | diff --git a/jethro-plugin/icons/JethroAction-action.png b/jethro-plugin/icons/JethroAction-action.png new file mode 100644 index 000000000..7bacfa712 Binary files /dev/null and b/jethro-plugin/icons/JethroAction-action.png differ diff --git a/jethro-plugin/icons/JethroPostAction-postaction.png b/jethro-plugin/icons/JethroPostAction-postaction.png new file mode 100644 index 000000000..7bacfa712 Binary files /dev/null and b/jethro-plugin/icons/JethroPostAction-postaction.png differ diff --git a/jethro-plugin/icons/JethroSource-batchsource.png b/jethro-plugin/icons/JethroSource-batchsource.png new file mode 100644 index 000000000..7bacfa712 Binary files /dev/null and b/jethro-plugin/icons/JethroSource-batchsource.png differ diff --git a/jethro-plugin/libs/jethro-jdbc-3.9.jar b/jethro-plugin/libs/jethro-jdbc-3.9.jar new file mode 100644 index 000000000..b49c5b97b Binary files /dev/null and b/jethro-plugin/libs/jethro-jdbc-3.9.jar differ diff --git a/jethro-plugin/pom.xml b/jethro-plugin/pom.xml new file mode 100644 index 000000000..fb14d8a30 --- /dev/null +++ b/jethro-plugin/pom.xml @@ -0,0 +1,117 @@ + + + + + database-plugins + io.cdap.plugin + 1.3.0-SNAPSHOT + + + Jethro Data plugin + jethro-plugin + 1.3.0-SNAPSHOT + 4.0.0 + + + + io.cdap.cdap + cdap-etl-api + + + io.cdap.plugin + database-commons + ${project.version} + + + io.cdap.plugin + hydrator-common + + + io.cdap.cdap + cdap-api + provided + + + org.jetbrains + annotations + RELEASE + compile + + + org.jethro + jethro-jdbc-driver + 3.9 + system + ${project.basedir}/libs/jethro-jdbc-3.9.jar + + + + io.cdap.plugin + database-commons + ${project.version} + test-jar + test + + + io.cdap.cdap + hydrator-test + + + io.cdap.cdap + cdap-data-pipeline + + + junit + junit + + + + + + + org.apache.felix + maven-bundle-plugin + 3.3.0 + true + + + <_exportcontents> + io.cdap.plugin.db.*; + + *;inline=false;scope=compile + true + lib + + + + + package + + bundle + + + + + + io.cdap + cdap-maven-plugin + + + + \ No newline at end of file diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/JethroUtils.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/JethroUtils.java new file mode 100644 index 000000000..53682d650 --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/JethroUtils.java @@ -0,0 +1,28 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.db.jethro; + +/** + * Jethro utils + */ +public class JethroUtils { + + private static final String JETHRO_CONNECTION_STRING_FORMAT = "jdbc:JethroData://%s:%d/%s"; + + public static String getConnectionString(String host, int port, String database) { + return String.format(JETHRO_CONNECTION_STRING_FORMAT, host, port, database); + } +} diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/action/JethroAction.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/action/JethroAction.java new file mode 100644 index 000000000..b95e401c5 --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/action/JethroAction.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.action; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.plugin.db.batch.action.AbstractDBAction; + +/** + * Action that runs Jethro command. + */ +@Plugin(type = Action.PLUGIN_TYPE) +@Name(JethroAction.NAME) +@Description("Action that runs a Jethro command") +public class JethroAction extends AbstractDBAction { + public static final String NAME = "JethroAction"; + + private final JethroActionConfig jethroActionConfig; + + public JethroAction(JethroActionConfig jethroActionConfig) { + super(jethroActionConfig, false); + this.jethroActionConfig = jethroActionConfig; + } +} diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/action/JethroActionConfig.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/action/JethroActionConfig.java new file mode 100644 index 000000000..3580c96fd --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/action/JethroActionConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.action; + +import io.cdap.plugin.db.batch.config.DBSpecificQueryConfig; +import io.cdap.plugin.db.jethro.JethroUtils; + +/** + * Jethro Action config + */ +public class JethroActionConfig extends DBSpecificQueryConfig { + + @Override + public String getConnectionString() { + return JethroUtils.getConnectionString(host, port, database); + } +} diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/postaction/JethroPostAction.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/postaction/JethroPostAction.java new file mode 100644 index 000000000..5493a79d6 --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/postaction/JethroPostAction.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.postaction; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.batch.PostAction; +import io.cdap.plugin.db.batch.action.AbstractQueryAction; + +/** + * Represents Jethro post action. + */ +@Plugin(type = PostAction.PLUGIN_TYPE) +@Name(JethroPostAction.NAME) +@Description("Runs a Jethro query after a pipeline run.") +public class JethroPostAction extends AbstractQueryAction { + public static final String NAME = "JethroPostAction"; + + private final JethroPostActionConfig jethroPostActionConfig; + + public JethroPostAction(JethroPostActionConfig jethroPostActionConfig) { + super(jethroPostActionConfig, false); + this.jethroPostActionConfig = jethroPostActionConfig; + } +} diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/postaction/JethroPostActionConfig.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/postaction/JethroPostActionConfig.java new file mode 100644 index 000000000..27069ee45 --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/postaction/JethroPostActionConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.postaction; + +import io.cdap.plugin.db.batch.config.DBSpecificQueryActionConfig; +import io.cdap.plugin.db.jethro.JethroUtils; + +/** + * Jethro Post Action config + */ +public class JethroPostActionConfig extends DBSpecificQueryActionConfig { + + @Override + public String getConnectionString() { + return JethroUtils.getConnectionString(host, port, database); + } +} diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/source/JethroSource.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/source/JethroSource.java new file mode 100644 index 000000000..f1e2d22dd --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/source/JethroSource.java @@ -0,0 +1,46 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.source; + +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.plugin.db.batch.source.AbstractDBSource; + +/** + * Batch source to read from Jethro Data. + */ +@Plugin(type = BatchSource.PLUGIN_TYPE) +@Name(JethroSource.NAME) +@Description("Reads from a database table(s) using a configurable SQL query." + + " Outputs one record for each row returned by the query.") +public class JethroSource extends AbstractDBSource { + public static final String NAME = "JethroSource"; + + private JethroSourceConfig config; + + public JethroSource(JethroSourceConfig config) { + super(config); + this.config = config; + } + + @Override + protected String createConnectionString() { + return config.getConnectionString(); + } +} diff --git a/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/source/JethroSourceConfig.java b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/source/JethroSourceConfig.java new file mode 100644 index 000000000..af5f7e113 --- /dev/null +++ b/jethro-plugin/src/main/java/io/cdap/plugin/db/jethro/source/JethroSourceConfig.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.plugin.db.jethro.source; + +import io.cdap.plugin.db.batch.config.DBSpecificSourceConfig; +import io.cdap.plugin.db.jethro.JethroUtils; + +/** + * Jethro Source config + */ +public class JethroSourceConfig extends DBSpecificSourceConfig { + + @Override + public String getConnectionString() { + return JethroUtils.getConnectionString(host, port, database); + } + +} diff --git a/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/JethroPluginTestBase.java b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/JethroPluginTestBase.java new file mode 100644 index 000000000..81dc9170f --- /dev/null +++ b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/JethroPluginTestBase.java @@ -0,0 +1,172 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.cdap.cdap.api.artifact.ArtifactSummary; +import io.cdap.cdap.api.plugin.PluginClass; +import io.cdap.cdap.datapipeline.DataPipelineApp; +import io.cdap.cdap.proto.id.ArtifactId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.plugin.db.ConnectionConfig; +import io.cdap.plugin.db.DBRecord; +import io.cdap.plugin.db.batch.DatabasePluginTestBase; +import io.cdap.plugin.db.batch.source.DataDrivenETLDBInputFormat; +import io.cdap.plugin.db.jethro.action.JethroAction; +import io.cdap.plugin.db.jethro.action.JethroActionConfig; +import io.cdap.plugin.db.jethro.postaction.JethroPostAction; +import io.cdap.plugin.db.jethro.postaction.JethroPostActionConfig; +import io.cdap.plugin.db.jethro.source.JethroSource; +import io.cdap.plugin.db.jethro.source.JethroSourceConfig; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.Map; + +public class JethroPluginTestBase extends DatabasePluginTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(JethroPluginTestBase.class); + + protected static final ArtifactId DATAPIPELINE_ARTIFACT_ID = NamespaceId.DEFAULT.artifact("data-pipeline", "3.2.0"); + protected static final ArtifactSummary DATAPIPELINE_ARTIFACT = new ArtifactSummary("data-pipeline", "3.2.0"); + protected static final long CURRENT_TS = System.currentTimeMillis(); + protected static final String DRIVER_CLASS = "com.jethrodata.JethroDriver"; + protected static final String JDBC_DRIVER_NAME = "jethro"; + + protected static String connectionUrl; + protected static final ZoneId UTC_ZONE = ZoneId.ofOffset("UTC", ZoneOffset.UTC); + protected static boolean tearDown = true; + private static int startCount; + + protected static final Map BASE_PROPS = ImmutableMap.builder() + .put(ConnectionConfig.JDBC_PLUGIN_NAME, JDBC_DRIVER_NAME) + .put(ConnectionConfig.HOST, System.getProperty("jethro.host", "localhost")) + .put(ConnectionConfig.PORT, System.getProperty("jethro.port", "9112")) + .put(ConnectionConfig.DATABASE, System.getProperty("jethro.instance", "demo")) + .put(ConnectionConfig.USER, System.getProperty("jethro.username", "jethro")) + .put(ConnectionConfig.PASSWORD, System.getProperty("jethro.password", "jethro")) + .build(); + + @BeforeClass + public static void setupTest() throws Exception { + if (startCount++ > 0) { + return; + } + + setupBatchArtifacts(DATAPIPELINE_ARTIFACT_ID, DataPipelineApp.class); + + addPluginArtifact(NamespaceId.DEFAULT.artifact(JDBC_DRIVER_NAME, "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, JethroSource.class, JethroSourceConfig.class, + DataDrivenETLDBInputFormat.class, DBRecord.class, JethroAction.class, JethroActionConfig.class, + JethroPostAction.class, JethroPostActionConfig.class); + + Class driverClass = Class.forName(DRIVER_CLASS); + + // add jethro 3rd party plugin + PluginClass jethroDriver = new PluginClass(ConnectionConfig.JDBC_PLUGIN_TYPE, JDBC_DRIVER_NAME, + "jethro driver class", + driverClass.getName(), + null, Collections.emptyMap()); + addPluginArtifact(NamespaceId.DEFAULT.artifact("jethro-jdbc-connector", "1.0.0"), + DATAPIPELINE_ARTIFACT_ID, + Sets.newHashSet(jethroDriver), driverClass); + + + connectionUrl = "jdbc:JethroData://" + BASE_PROPS.get(ConnectionConfig.HOST) + ":" + + BASE_PROPS.get(ConnectionConfig.PORT) + "/" + BASE_PROPS.get(ConnectionConfig.DATABASE); + Connection conn = createConnection(); + createTestTables(conn); + populateData(conn); + } + + public static Connection createConnection() { + try { + Class.forName(DRIVER_CLASS); + return DriverManager.getConnection(connectionUrl, BASE_PROPS.get(ConnectionConfig.USER), + BASE_PROPS.get(ConnectionConfig.PASSWORD)); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + protected static void createTestTables(Connection conn) throws SQLException { + try (Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE dbActionTest (x INT, name STRING)"); + stmt.execute("CREATE TABLE postActionTest (x int, name string)"); + + stmt.execute("CREATE TABLE test_table (int_value int, long_value bigint, float_value float, " + + "double_value double, timestamp_value timestamp, string_value string)"); + } + } + + private static void populateData(Connection conn) throws SQLException { + + try (Statement stmt = conn.createStatement()) { + stmt.executeQuery("INSERT INTO dbActionTest VALUES (1, 'test')"); + stmt.executeQuery("INSERT INTO postActionTest VALUES (1, 'test')"); + } + + try (PreparedStatement stmt = conn.prepareStatement("INSERT INTO test_table VALUES (?,?,?,?,?,?)")) { + for (int i = 0; i < 4; i++) { + stmt.setInt(1, 1 + i); + stmt.setLong(2, (long) 100 + i); + stmt.setFloat(3, (float) 0.1 + i); + stmt.setDouble(4, 0.03 + i); + stmt.setTimestamp(5, new Timestamp(CURRENT_TS)); + stmt.setString(6, "Test_" + i); + stmt.execute(); + } + } + + try (PreparedStatement stmt = + conn.prepareStatement( + "INSERT INTO test_table (int_value, long_value, float_value, double_value) VALUES (?,?,?,?)")) { + stmt.setInt(1, 5); + stmt.setLong(2, (long) 104); + stmt.setFloat(3, (float) 4.1); + stmt.setDouble(4, 4.03); + stmt.execute(); + } + } + + @AfterClass + public static void tearDownDB() throws SQLException { + if (!tearDown) { + return; + } + + try (Connection conn = createConnection(); + Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE test_table"); + stmt.execute("DROP TABLE dbActionTest"); + stmt.execute("DROP TABLE postActionTest"); + } + } +} diff --git a/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/action/JethroActionTestRun.java b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/action/JethroActionTestRun.java new file mode 100644 index 000000000..610d203cc --- /dev/null +++ b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/action/JethroActionTestRun.java @@ -0,0 +1,73 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.action; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.api.action.Action; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.plugin.db.batch.action.QueryConfig; +import io.cdap.plugin.db.jethro.JethroPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +public class JethroActionTestRun extends JethroPluginTestBase { + + @Test + public void testAction() throws Exception { + ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput")); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput")); + ETLStage action = new ETLStage("action", new ETLPlugin( + JethroAction.NAME, + Action.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(QueryConfig.QUERY, "TRUNCATE TABLE dbActionTest") + .build(), + null)); + + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addStage(action) + .addConnection(sink.getName(), action.getName()) + .addConnection(source.getName(), sink.getName()) + .build(); + + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config); + ApplicationId appId = NamespaceId.DEFAULT.app("actionTest"); + ApplicationManager appManager = deployApplication(appId, appRequest); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + try (Connection connection = createConnection(); + Statement statement = connection.createStatement(); + ResultSet results = statement.executeQuery("select * from dbActionTest")) { + Assert.assertFalse(results.next()); + } + } +} diff --git a/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/postaction/JethroPostActionTestRun.java b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/postaction/JethroPostActionTestRun.java new file mode 100644 index 000000000..d45950b81 --- /dev/null +++ b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/postaction/JethroPostActionTestRun.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.postaction; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.etl.api.batch.PostAction; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.mock.batch.MockSource; +import io.cdap.cdap.etl.proto.v2.ETLBatchConfig; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.etl.proto.v2.ETLStage; +import io.cdap.cdap.proto.artifact.AppRequest; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.plugin.common.batch.action.Condition; +import io.cdap.plugin.db.batch.action.QueryActionConfig; +import io.cdap.plugin.db.batch.action.QueryConfig; +import io.cdap.plugin.db.jethro.JethroPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; + +public class JethroPostActionTestRun extends JethroPluginTestBase { + + @Test + public void testPostAction() throws Exception { + ETLStage source = new ETLStage("source", MockSource.getPlugin("actionInput")); + ETLStage sink = new ETLStage("sink", MockSink.getPlugin("actionOutput")); + ETLStage action = new ETLStage("action", new ETLPlugin( + JethroPostAction.NAME, + PostAction.PLUGIN_TYPE, + ImmutableMap.builder() + .putAll(BASE_PROPS) + .put(QueryConfig.QUERY, "TRUNCATE TABLE postActionTest") + .put(QueryActionConfig.RUN_CONDITION, Condition.SUCCESS.name()) + .build(), + null)); + + ETLBatchConfig config = ETLBatchConfig.builder() + .addStage(source) + .addStage(sink) + .addPostAction(action) + .addConnection(source.getName(), sink.getName()) + .build(); + + AppRequest appRequest = new AppRequest<>(DATAPIPELINE_ARTIFACT, config); + ApplicationId appId = NamespaceId.DEFAULT.app("postActionTest"); + ApplicationManager appManager = deployApplication(appId, appRequest); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", "0")); + + try (Connection connection = createConnection(); + Statement statement = connection.createStatement(); + ResultSet results = statement.executeQuery("select * from postActionTest")) { + Assert.assertFalse(results.next()); + } + } +} diff --git a/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/source/JethroSourceTestRun.java b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/source/JethroSourceTestRun.java new file mode 100644 index 000000000..0e3ef630e --- /dev/null +++ b/jethro-plugin/src/test/java/io/cdap/plugin/db/jethro/source/JethroSourceTestRun.java @@ -0,0 +1,124 @@ +/* + * Copyright © 2019 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.db.jethro.source; + +import com.google.common.collect.ImmutableMap; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.dataset.table.Table; +import io.cdap.cdap.etl.api.batch.BatchSource; +import io.cdap.cdap.etl.mock.batch.MockSink; +import io.cdap.cdap.etl.proto.v2.ETLPlugin; +import io.cdap.cdap.test.ApplicationManager; +import io.cdap.cdap.test.DataSetManager; +import io.cdap.plugin.common.Constants; +import io.cdap.plugin.db.batch.source.AbstractDBSource; +import io.cdap.plugin.db.jethro.JethroPluginTestBase; +import org.junit.Assert; +import org.junit.Test; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class JethroSourceTestRun extends JethroPluginTestBase { + + @Test + public void testCheckAllTypes() throws Exception { + String importQuery = + "SELECT * FROM test_table"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Constants.Reference.REFERENCE_NAME, "jethroSource") + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.NUM_SPLITS, "1") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "JethroSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("sinkOutputTable"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testAllTypes"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager outputManager = getDataset("sinkOutputTable"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(5, records.size()); + + java.util.Date date = new java.util.Date(CURRENT_TS); + ZonedDateTime expectedTs = date.toInstant().atZone(UTC_ZONE); + AtomicInteger i = new AtomicInteger(); + records.forEach(record -> { + Assert.assertEquals(1 + i.get(), (int) record.get("int_value")); + Assert.assertEquals(100 + i.get(), (long) record.get("long_value")); + Assert.assertEquals(0.1 + i.get(), (float) record.get("float_value"), 0.01); + Assert.assertEquals(0.03 + i.get(), (double) record.get("double_value"), 0.01); + if (i.get() != 4) { + Assert.assertEquals("Test_" + i, record.get("string_value")); + } else { + Assert.assertNull(record.get("timestamp_value")); + Assert.assertNull(record.get("string_value")); + } + i.getAndIncrement(); + }); + } + + @Test + public void testCheckBoundingQuery() throws Exception { + String importQuery = + "SELECT * FROM test_table WHERE int_value = 3 AND $CONDITIONS"; + String conditions = "SELECT MIN(int_value), MAX(int_value) FROM test_table"; + + ImmutableMap sourceProps = ImmutableMap.builder() + .put(Constants.Reference.REFERENCE_NAME, "jethroSource") + .putAll(BASE_PROPS) + .put(AbstractDBSource.DBSourceConfig.IMPORT_QUERY, importQuery) + .put(AbstractDBSource.DBSourceConfig.BOUNDING_QUERY, conditions) + .put(AbstractDBSource.DBSourceConfig.NUM_SPLITS, "2") + .put(AbstractDBSource.DBSourceConfig.SPLIT_BY, "int_value") + .build(); + + ETLPlugin sourceConfig = new ETLPlugin( + "JethroSource", + BatchSource.PLUGIN_TYPE, + sourceProps + ); + + ETLPlugin sinkConfig = MockSink.getPlugin("sinkBoundingQuery"); + + ApplicationManager appManager = deployETL(sourceConfig, sinkConfig, + DATAPIPELINE_ARTIFACT, "testBoundingQuery"); + runETLOnce(appManager, ImmutableMap.of("logical.start.time", String.valueOf(CURRENT_TS))); + + DataSetManager
outputManager = getDataset("sinkBoundingQuery"); + List records = MockSink.readOutput(outputManager); + Assert.assertEquals(1, records.size()); + + StructuredRecord record = records.get(0); + Assert.assertEquals(3, (int) record.get("int_value")); + Assert.assertEquals(102, (long) record.get("long_value")); + Assert.assertEquals(2.1, (float) record.get("float_value"), 0.01); + Assert.assertEquals(2.03, (double) record.get("double_value"), 0.01); + Assert.assertEquals("Test_2", record.get("string_value")); + + } +} diff --git a/jethro-plugin/widgets/JethroAction-action.json b/jethro-plugin/widgets/JethroAction-action.json new file mode 100644 index 000000000..a40301001 --- /dev/null +++ b/jethro-plugin/widgets/JethroAction-action.json @@ -0,0 +1,66 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Jethro Action", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "jethro" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "number", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": "9111" + } + }, + { + "widget-type": "textbox", + "label": "Instance", + "name": "database" + }, + { + "widget-type": "textarea", + "label": "Database Command", + "name": "query" + }, + { + "widget-type": "hidden", + "name": "connectionArguments" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + } + ] +} diff --git a/jethro-plugin/widgets/JethroPostAction-postaction.json b/jethro-plugin/widgets/JethroPostAction-postaction.json new file mode 100644 index 000000000..3ade0880c --- /dev/null +++ b/jethro-plugin/widgets/JethroPostAction-postaction.json @@ -0,0 +1,82 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Jethro Post Action", + "configuration-groups": [ + { + "label": "Basic", + "properties": [ + { + "widget-type": "select", + "label": "Run Condition", + "name": "runCondition", + "widget-attributes": { + "values": [ + "completion", + "success", + "failure" + ], + "default": "success" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "jethro" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "number", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": "9111" + } + }, + { + "widget-type": "textbox", + "label": "Instance", + "name": "database" + }, + { + "widget-type": "textarea", + "label": "Query", + "name": "query", + "widget-attributes": { + "rows": "4" + } + }, + { + "widget-type": "hidden", + "name": "connectionArguments" + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + } + ] +} diff --git a/jethro-plugin/widgets/JethroSource-batchsource.json b/jethro-plugin/widgets/JethroSource-batchsource.json new file mode 100644 index 000000000..a9a263523 --- /dev/null +++ b/jethro-plugin/widgets/JethroSource-batchsource.json @@ -0,0 +1,137 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "Jethro Source", + "configuration-groups": [ + { + "label": "General", + "properties": [ + { + "widget-type": "textbox", + "label": "Reference Name", + "name": "referenceName", + "widget-attributes": { + "placeholder": "Name used to identify this source for lineage" + } + }, + { + "widget-type": "textbox", + "label": "Driver Name", + "name": "jdbcPluginName", + "widget-attributes": { + "default": "jethro" + } + }, + { + "widget-type": "textbox", + "label": "Host", + "name": "host", + "widget-attributes": { + "default": "localhost" + } + }, + { + "widget-type": "textbox", + "label": "Port", + "name": "port", + "widget-attributes": { + "default": 9111, + "placeholder": "Port." + } + }, + { + "widget-type": "textbox", + "label": "Instance", + "name": "database" + }, + { + "widget-type": "textarea", + "label": "Import Query", + "name": "importQuery", + "widget-attributes": { + "rows": "4" + }, + "plugin-function": { + "widget": "outputSchema", + "output-property": "schema", + "omit-properties": [ + { + "name": "schema" + } + ] + } + }, + { + "widget-type": "textarea", + "label": "Bounding Query", + "name": "boundingQuery", + "widget-attributes": { + "rows": "4" + } + } + ] + }, + { + "label": "Credentials", + "properties": [ + { + "widget-type": "textbox", + "label": "Username", + "name": "user" + }, + { + "widget-type": "password", + "label": "Password", + "name": "password" + } + ] + }, + { + "label": "Advanced", + "properties": [ + { + "widget-type": "textbox", + "label": "Split-By Field Name", + "name": "splitBy" + }, + { + "widget-type": "textbox", + "label": "Number of Splits to Generate", + "name": "numSplits", + "widget-attributes": { + "default": "1" + } + }, + { + "widget-type": "hidden", + "name": "connectionArguments" + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "int", + "long", + "float", + "double", + "string", + "timestamp" + ], + "schema-default-type": "string" + } + } + ], + "jump-config": { + "datasets": [ + { + "ref-property-name": "referenceName" + } + ] + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index cbb1fe5fe..0cca00aca 100644 --- a/pom.xml +++ b/pom.xml @@ -38,6 +38,7 @@ aurora-mysql-plugin aurora-postgresql-plugin memsql-plugin + jethro-plugin