From 64518f2cdd28d7ab5b3434b59337df642b058ed6 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Thu, 12 Sep 2024 13:38:25 +0200 Subject: [PATCH] Add builder for SQL statements for our tests (#43) * Add builder for SQL statements for our tests Signed-off-by: Jakub Stejskal * Build testing sql to amke it easier to debug Signed-off-by: Jakub Stejskal * Rebase Signed-off-by: Jakub Stejskal * Fix spotbugs Signed-off-by: Jakub Stejskal * Final fix Signed-off-by: Jakub Stejskal --------- Signed-off-by: Jakub Stejskal --- .spotbugs/spotbugs-exclude.xml | 3 + pom.xml | 1 - .../io/streams/constants/FlinkConstants.java | 43 ----- .../templates/FlinkDeploymentTemplate.java | 10 +- src/main/java/io/streams/sql/SqlWith.java | 88 ++++++++++ .../java/io/streams/sql/TestStatements.java | 166 ++++++++++++++++++ src/main/resources/log4j2.properties | 1 + .../streams/e2e/flink/sql/SqlExampleST.java | 12 +- .../java/io/streams/unit/SqlWithTest.java | 82 +++++++++ 9 files changed, 352 insertions(+), 54 deletions(-) delete mode 100644 src/main/java/io/streams/constants/FlinkConstants.java create mode 100644 src/main/java/io/streams/sql/SqlWith.java create mode 100644 src/main/java/io/streams/sql/TestStatements.java create mode 100644 src/test/java/io/streams/unit/SqlWithTest.java diff --git a/.spotbugs/spotbugs-exclude.xml b/.spotbugs/spotbugs-exclude.xml index d2cfd4f..29b6c77 100644 --- a/.spotbugs/spotbugs-exclude.xml +++ b/.spotbugs/spotbugs-exclude.xml @@ -18,4 +18,7 @@ + + + diff --git a/pom.xml b/pom.xml index e09e671..9a4ad5d 100644 --- a/pom.xml +++ b/pom.xml @@ -263,7 +263,6 @@ ${kafka.version} compile - diff --git a/src/main/java/io/streams/constants/FlinkConstants.java b/src/main/java/io/streams/constants/FlinkConstants.java deleted file mode 100644 index 76f3eae..0000000 --- a/src/main/java/io/streams/constants/FlinkConstants.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright streamshub authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.streams.constants; - -public interface FlinkConstants { - String TEST_SQL_EXAMPLE_STATEMENT = - "CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " + - "WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " + - "'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ); CREATE TABLE ClickStreamTable " + - "( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + - "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + - "'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " + - "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " + - "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + - "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + - "'latest-offset' ); CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " + - "quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + - "WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + - "'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " + - "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " + - "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + - "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + - "'latest-offset' ); CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " + - "`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " + - "'topic' = 'flink.recommended.products', 'properties.bootstrap.servers' = " + - "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.client.id' = " + - "'recommended-products-producer-client', 'properties.transaction.timeout.ms' = '800000', " + - "'key.format' = 'csv', 'value.format' = 'csv', 'value.fields-include' = 'ALL' ); CREATE TEMPORARY " + - "VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category " + - "FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id; " + - "CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, " + - "p.category, p.stock, p.rating, sr.user_id as purchased FROM clicked_products cp JOIN " + - "ProductInventoryTable AS p ON cp.category = p.category LEFT JOIN SalesRecordTable sr ON " + - "cp.user_id = sr.user_id AND p.product_id = sr.product_id WHERE p.stock > 0 GROUP BY p.product_id, " + - "p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating; CREATE TEMPORARY VIEW " + - "top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, cp.stock, cp.rating, " + - "cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ORDER BY cp.purchased DESC, cp.rating DESC) " + - "AS rn FROM category_products cp; INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') " + - "AS top_product_ids, TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP " + - "BY user_id, TUMBLE(event_time, INTERVAL '5' SECOND);"; -} diff --git a/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java b/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java index 5151db5..a3bfcd8 100644 --- a/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java +++ b/src/main/java/io/streams/operands/flink/templates/FlinkDeploymentTemplate.java @@ -4,7 +4,6 @@ */ package io.streams.operands.flink.templates; -import io.streams.constants.FlinkConstants; import org.apache.flink.v1beta1.FlinkDeploymentBuilder; import org.apache.flink.v1beta1.FlinkDeploymentSpec; import org.apache.flink.v1beta1.flinkdeploymentspec.Job; @@ -93,12 +92,13 @@ public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, St /** * Return default flink deployment for sql runner * - * @param namespace namespace of flink deployment - * @param name name of deployment + * @param namespace namespace of flink deployment + * @param name name of deployment + * @param sqlStatements list of SQL statements that will be executed * @return flink deployment builder */ - public static FlinkDeploymentBuilder flinkExampleDeployment(String namespace, String name) { - return defaultFlinkDeployment(namespace, name, List.of(FlinkConstants.TEST_SQL_EXAMPLE_STATEMENT)) + public static FlinkDeploymentBuilder flinkExampleDeployment(String namespace, String name, List sqlStatements) { + return defaultFlinkDeployment(namespace, name, sqlStatements) .editSpec() .editPodTemplate() .editFlinkdeploymentspecSpec() diff --git a/src/main/java/io/streams/sql/SqlWith.java b/src/main/java/io/streams/sql/SqlWith.java new file mode 100644 index 0000000..8e87151 --- /dev/null +++ b/src/main/java/io/streams/sql/SqlWith.java @@ -0,0 +1,88 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.sql; + +import io.sundr.builder.annotations.Buildable; + +import java.security.InvalidParameterException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.StringJoiner; + +@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder") +public class SqlWith { + private String sqlStatement; + private String connector; + private String bootstrapServer; + private String topic; + private Map additionalProperties = new HashMap<>(); + + public String getSqlStatement() { + return sqlStatement; + } + + public void setSqlStatement(String sqlStatement) { + if (sqlStatement.isEmpty()) { + throw new InvalidParameterException("sqlStatement cannot be empty!"); + } + this.sqlStatement = sqlStatement; + } + + public String getConnector() { + return connector; + } + + public void setConnector(String connector) { + if (connector == null || connector.isEmpty()) { + throw new InvalidParameterException("Connector cannot be empty!"); + } + this.connector = connector; + } + + public String getBootstrapServer() { + return bootstrapServer; + } + + public void setBootstrapServer(String bootstrapServer) { + this.bootstrapServer = bootstrapServer; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public Map getAdditionalProperties() { + return Collections.unmodifiableMap(additionalProperties); + } + + public void setAdditionalProperties(Map additionalProperties) { + this.additionalProperties = (additionalProperties == null || additionalProperties.isEmpty()) + ? new HashMap<>() : additionalProperties; + } + + public String generateSql() { + StringJoiner withClause = new StringJoiner(", ", "WITH (", ")"); + // Add connector + withClause.add("'connector' = '" + connector + "'"); + // Add Kafka specific info if set + if (bootstrapServer != null && !bootstrapServer.isEmpty()) { + withClause.add("'properties.bootstrap.servers' = '" + bootstrapServer + "'"); + } + if (topic != null && !topic.isEmpty()) { + withClause.add("'topic' = '" + topic + "'"); + } + // Add additional properties + if (additionalProperties != null) { + additionalProperties.forEach((key, value) -> withClause.add("'" + key + "' = '" + value + "'")); + } + + return sqlStatement + " " + withClause + ";"; + } +} diff --git a/src/main/java/io/streams/sql/TestStatements.java b/src/main/java/io/streams/sql/TestStatements.java new file mode 100644 index 0000000..0a3e7fd --- /dev/null +++ b/src/main/java/io/streams/sql/TestStatements.java @@ -0,0 +1,166 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.sql; + +import java.util.HashMap; +import java.util.Map; + +public class TestStatements { + // Private constructor + private TestStatements() { + + } + + public static String getTestSqlExample(String bootstrap, String registryUrl) { + // CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " + + // "WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " + + // "'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ) + StringBuilder builder = new StringBuilder(); + builder.append("CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING )"); + + Map additionalProperties = new HashMap<>(); + additionalProperties.put("path", "/opt/flink/data/productInventory.csv"); + additionalProperties.put("format", "csv"); + additionalProperties.put("csv.ignore-parse-errors", "true"); + + SqlWith sqlWith = new SqlWithBuilder() + .withSqlStatement(builder.toString()) + .withConnector("filesystem") + .withAdditionalProperties(additionalProperties) + .build(); + + String part1 = sqlWith.generateSql(); + + // CREATE TABLE ClickStreamTable " + + // "( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + + // "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + + // "'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " + + // "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " + + // "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + + // "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + + // "'latest-offset' ) + builder = new StringBuilder(); + builder.append("CREATE TABLE ClickStreamTable ("); + builder.append("user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', "); + builder.append("WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND )"); + + additionalProperties = new HashMap<>(); + additionalProperties.put("properties.group.id", "click-stream-group"); + additionalProperties.put("value.format", "avro-confluent"); + additionalProperties.put("value.avro-confluent.url", registryUrl); + additionalProperties.put("scan.startup.mode", "latest-offset"); + + sqlWith = new SqlWithBuilder() + .withSqlStatement(builder.toString()) + .withConnector("kafka") + .withTopic("flink.click.streams") + .withBootstrapServer(bootstrap) + .withAdditionalProperties(additionalProperties) + .build(); + + String part2 = sqlWith.generateSql(); + + // CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " + + // "quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + + // "WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + + // "'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " + + // "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " + + // "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + + // "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + + // "'latest-offset' ) + builder = new StringBuilder(); + builder.append("CREATE TABLE SalesRecordTable ("); + builder.append("invoice_id STRING, user_id STRING, product_id STRING, quantity STRING, unit_cost STRING, "); + builder.append("`purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', "); + builder.append("WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND )"); + + additionalProperties = new HashMap<>(); + additionalProperties.put("properties.group.id", "sales-record-group"); + additionalProperties.put("value.format", "avro-confluent"); + additionalProperties.put("value.avro-confluent.url", registryUrl); + additionalProperties.put("scan.startup.mode", "latest-offset"); + + sqlWith = new SqlWithBuilder() + .withSqlStatement(builder.toString()) + .withConnector("kafka") + .withTopic("flink.sales.records") + .withBootstrapServer(bootstrap) + .withAdditionalProperties(additionalProperties) + .build(); + + String part3 = sqlWith.generateSql(); + + // CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " + + // "`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " + + // "'topic' = 'flink.recommended.products', 'properties.bootstrap.servers' = " + + // "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.client.id' = " + + // "'recommended-products-producer-client', 'properties.transaction.timeout.ms' = '800000', " + + // "'key.format' = 'csv', 'value.format' = 'csv', 'value.fields-include' = 'ALL' ) + builder = new StringBuilder(); + builder.append("CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, `event_time` TIMESTAMP(3), "); + builder.append("PRIMARY KEY(`user_id`) NOT ENFORCED )"); + + additionalProperties = new HashMap<>(); + additionalProperties.put("properties.client.id", "recommended-products-producer-client"); + additionalProperties.put("properties.transaction.timeout.ms", "800000"); + additionalProperties.put("key.format", "csv"); + additionalProperties.put("value.format", "csv"); + additionalProperties.put("value.fields-include", "ALL"); + + sqlWith = new SqlWithBuilder() + .withSqlStatement(builder.toString()) + .withConnector("upsert-kafka") + .withBootstrapServer(bootstrap) + .withTopic("flink.recommended.products") + .withAdditionalProperties(additionalProperties) + .build(); + + String part4 = sqlWith.generateSql(); + + // CREATE TEMPORARY VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category " + + // "FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id + builder = new StringBuilder(); + builder.append("CREATE TEMPORARY VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category "); + builder.append("FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id;"); + String part5 = builder.toString(); + + // CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, " + + // "p.category, p.stock, p.rating, sr.user_id as purchased FROM clicked_products cp JOIN " + + // "ProductInventoryTable AS p ON cp.category = p.category LEFT JOIN SalesRecordTable sr ON " + + // "cp.user_id = sr.user_id AND p.product_id = sr.product_id WHERE p.stock > 0 GROUP BY p.product_id, " + + // "p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating + builder = new StringBuilder(); + builder.append("CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, "); + builder.append("p.category, p.stock, p.rating, sr.user_id as purchased "); + builder.append("FROM clicked_products cp "); + builder.append("JOIN ProductInventoryTable AS p ON cp.category = p.category "); + builder.append("LEFT JOIN SalesRecordTable sr ON cp.user_id = sr.user_id "); + builder.append("AND p.product_id = sr.product_id WHERE p.stock > 0 "); + builder.append("GROUP BY p.product_id, p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating;"); + String part6 = builder.toString(); + + // CREATE TEMPORARY VIEW " + + // "top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, cp.stock, cp.rating, " + + // "cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ORDER BY cp.purchased DESC, cp.rating DESC) " + + // "AS rn FROM category_products cp + builder = new StringBuilder(); + builder.append("CREATE TEMPORARY VIEW top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, "); + builder.append("cp.stock, cp.rating, cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id "); + builder.append("ORDER BY cp.purchased DESC, cp.rating DESC) AS rn "); + builder.append("FROM category_products cp;"); + String part7 = builder.toString(); + + // INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') " + + // "AS top_product_ids, TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP " + + // "BY user_id, TUMBLE(event_time, INTERVAL '5' SECOND); + builder = new StringBuilder(); + builder.append("INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') AS top_product_ids, "); + builder.append("TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP BY user_id, "); + builder.append("TUMBLE(event_time, INTERVAL '5' SECOND);"); + String part8 = builder.toString(); + + return part1 + part2 + part3 + part4 + part5 + part6 + part7 + part8; + } +} diff --git a/src/main/resources/log4j2.properties b/src/main/resources/log4j2.properties index b7de80f..f1e59ea 100644 --- a/src/main/resources/log4j2.properties +++ b/src/main/resources/log4j2.properties @@ -23,3 +23,4 @@ rootLogger.appenderRef.console.level = ${env:TEST_LOG_LEVEL:-INFO} rootLogger.appenderRef.rolling.ref = RollingFile rootLogger.appenderRef.rolling.level = DEBUG rootLogger.additivity = false + diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java b/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java index 30c991d..34d82d3 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java @@ -23,6 +23,7 @@ import io.streams.operators.manifests.CertManagerManifestInstaller; import io.streams.operators.manifests.FlinkManifestInstaller; import io.streams.operators.manifests.StrimziManifestInstaller; +import io.streams.sql.TestStatements; import io.streams.utils.kube.JobUtils; import io.strimzi.api.kafka.model.nodepool.ProcessRoles; import org.apache.flink.v1beta1.FlinkDeployment; @@ -115,14 +116,15 @@ void testFlinkSqlExample() throws IOException { KubeResourceManager.getInstance().createOrUpdateResourceWithWait(dataApp.toArray(new HasMetadata[0])); // Deploy flink + String bootstrapServer = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get() + .getStatus().getListeners().get(0).getBootstrapServers(); + String registryUrl = "http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6"; + FlinkDeployment flinkApp = FlinkDeploymentTemplate.flinkExampleDeployment(namespace, - "recommendation-app").build(); + "recommendation-app", List.of(TestStatements.getTestSqlExample(bootstrapServer, registryUrl))).build(); KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flinkApp); // Run internal consumer and check if topic contains messages - String bootstrapServer = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get() - .getStatus().getListeners().get(0).getBootstrapServers(); - String consumerName = "kafka-consumer"; StrimziKafkaClients strimziKafkaClients = new StrimziKafkaClientsBuilder() .withConsumerName(consumerName) @@ -141,6 +143,6 @@ void testFlinkSqlExample() throws IOException { .get(0).getMetadata().getName(); String log = KubeResourceManager.getKubeClient().getLogsFromPod(namespace, consumerPodName); - assertTrue(log.contains("user-9")); + assertTrue(log.contains("user-")); } } diff --git a/src/test/java/io/streams/unit/SqlWithTest.java b/src/test/java/io/streams/unit/SqlWithTest.java new file mode 100644 index 0000000..2a0a6ff --- /dev/null +++ b/src/test/java/io/streams/unit/SqlWithTest.java @@ -0,0 +1,82 @@ +/* + * Copyright streamshub authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.streams.unit; + +import io.streams.sql.SqlWith; +import io.streams.sql.SqlWithBuilder; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SqlWithTest { + @Test + void testCreateTableWithFilesystemConnector() { + String expectedSql = "CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " + + "WITH ('connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " + + "'csv.ignore-parse-errors' = 'true', 'format' = 'csv');"; + + // CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " + + // "WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " + + // "'format' = 'csv', 'csv.ignore-parse-errors' = 'true' ) + StringBuilder builder = new StringBuilder(); + builder.append("CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING )"); + + Map additionalProperties = new HashMap<>(); + additionalProperties.put("path", "/opt/flink/data/productInventory.csv"); + additionalProperties.put("format", "csv"); + additionalProperties.put("csv.ignore-parse-errors", "true"); + + SqlWith sqlWith = new SqlWithBuilder() + .withSqlStatement(builder.toString()) + .withConnector("filesystem") + .withAdditionalProperties(additionalProperties) + .build(); + + assertTrue(Objects.equals(sqlWith.generateSql(), expectedSql)); + } + + @Test + void testCreateTableWithKafkaConnector() { + String expectedSql = "CREATE TABLE ClickStreamTable (user_id STRING, product_id STRING, " + + "`event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + + "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) " + + "WITH ('connector' = 'kafka', 'properties.bootstrap.servers' = 'my-kafka.bootstrap', 'topic' = 'flink.click.streams', " + + "'value.format' = 'avro-confluent', 'properties.group.id' = 'click-stream-group', " + + "'value.avro-confluent.url' = 'https://apicurio.registry', 'scan.startup.mode' = 'latest-offset');"; + + // CREATE TABLE ClickStreamTable " + + // "( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " + + // "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " + + // "'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " + + // "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " + + // "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " + + // "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " + + // "'latest-offset' ) + StringBuilder builder = new StringBuilder(); + builder.append("CREATE TABLE ClickStreamTable ("); + builder.append("user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', "); + builder.append("WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND )"); + + Map additionalProperties = new HashMap<>(); + additionalProperties.put("properties.group.id", "click-stream-group"); + additionalProperties.put("value.format", "avro-confluent"); + additionalProperties.put("value.avro-confluent.url", "https://apicurio.registry"); + additionalProperties.put("scan.startup.mode", "latest-offset"); + + SqlWith sqlWith = new SqlWithBuilder() + .withSqlStatement(builder.toString()) + .withConnector("kafka") + .withTopic("flink.click.streams") + .withBootstrapServer("my-kafka.bootstrap") + .withAdditionalProperties(additionalProperties) + .build(); + + assertTrue(Objects.equals(sqlWith.generateSql(), expectedSql)); + } +}