Skip to content

Commit

Permalink
Add builder for SQL statements for our tests (#43)
Browse files Browse the repository at this point in the history
* Add builder for SQL statements for our tests

Signed-off-by: Jakub Stejskal <[email protected]>

* Build testing sql to amke it easier to debug

Signed-off-by: Jakub Stejskal <[email protected]>

* Rebase

Signed-off-by: Jakub Stejskal <[email protected]>

* Fix spotbugs

Signed-off-by: Jakub Stejskal <[email protected]>

* Final fix

Signed-off-by: Jakub Stejskal <[email protected]>

---------

Signed-off-by: Jakub Stejskal <[email protected]>
  • Loading branch information
Frawless authored Sep 12, 2024
1 parent 3f6dcbe commit 64518f2
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 54 deletions.
3 changes: 3 additions & 0 deletions .spotbugs/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@
<Match>
<Class name="~io\.streams\.clients\.(kafka)\..+(Builder|Fluent)(\$.*)?" />
</Match>
<Match>
<Class name="~io\.streams\.sql\..+(Builder|Fluent)(\$.*)?" />
</Match>
</FindBugsFilter>
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@
<version>${kafka.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>

<build>
Expand Down
43 changes: 0 additions & 43 deletions src/main/java/io/streams/constants/FlinkConstants.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.streams.operands.flink.templates;

import io.streams.constants.FlinkConstants;
import org.apache.flink.v1beta1.FlinkDeploymentBuilder;
import org.apache.flink.v1beta1.FlinkDeploymentSpec;
import org.apache.flink.v1beta1.flinkdeploymentspec.Job;
Expand Down Expand Up @@ -93,12 +92,13 @@ public static FlinkDeploymentBuilder defaultFlinkDeployment(String namespace, St
/**
* Return default flink deployment for sql runner
*
* @param namespace namespace of flink deployment
* @param name name of deployment
* @param namespace namespace of flink deployment
* @param name name of deployment
* @param sqlStatements list of SQL statements that will be executed
* @return flink deployment builder
*/
public static FlinkDeploymentBuilder flinkExampleDeployment(String namespace, String name) {
return defaultFlinkDeployment(namespace, name, List.of(FlinkConstants.TEST_SQL_EXAMPLE_STATEMENT))
public static FlinkDeploymentBuilder flinkExampleDeployment(String namespace, String name, List<String> sqlStatements) {
return defaultFlinkDeployment(namespace, name, sqlStatements)
.editSpec()
.editPodTemplate()
.editFlinkdeploymentspecSpec()
Expand Down
88 changes: 88 additions & 0 deletions src/main/java/io/streams/sql/SqlWith.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright streamshub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streams.sql;

import io.sundr.builder.annotations.Buildable;

import java.security.InvalidParameterException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.StringJoiner;

@Buildable(editableEnabled = false, builderPackage = "io.fabric8.kubernetes.api.builder")
public class SqlWith {
private String sqlStatement;
private String connector;
private String bootstrapServer;
private String topic;
private Map<String, String> additionalProperties = new HashMap<>();

public String getSqlStatement() {
return sqlStatement;
}

public void setSqlStatement(String sqlStatement) {
if (sqlStatement.isEmpty()) {
throw new InvalidParameterException("sqlStatement cannot be empty!");
}
this.sqlStatement = sqlStatement;
}

public String getConnector() {
return connector;
}

public void setConnector(String connector) {
if (connector == null || connector.isEmpty()) {
throw new InvalidParameterException("Connector cannot be empty!");
}
this.connector = connector;
}

public String getBootstrapServer() {
return bootstrapServer;
}

public void setBootstrapServer(String bootstrapServer) {
this.bootstrapServer = bootstrapServer;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public Map<String, String> getAdditionalProperties() {
return Collections.unmodifiableMap(additionalProperties);
}

public void setAdditionalProperties(Map<String, String> additionalProperties) {
this.additionalProperties = (additionalProperties == null || additionalProperties.isEmpty())
? new HashMap<>() : additionalProperties;
}

public String generateSql() {
StringJoiner withClause = new StringJoiner(", ", "WITH (", ")");
// Add connector
withClause.add("'connector' = '" + connector + "'");
// Add Kafka specific info if set
if (bootstrapServer != null && !bootstrapServer.isEmpty()) {
withClause.add("'properties.bootstrap.servers' = '" + bootstrapServer + "'");
}
if (topic != null && !topic.isEmpty()) {
withClause.add("'topic' = '" + topic + "'");
}
// Add additional properties
if (additionalProperties != null) {
additionalProperties.forEach((key, value) -> withClause.add("'" + key + "' = '" + value + "'"));
}

return sqlStatement + " " + withClause + ";";
}
}
166 changes: 166 additions & 0 deletions src/main/java/io/streams/sql/TestStatements.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright streamshub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streams.sql;

import java.util.HashMap;
import java.util.Map;

public class TestStatements {
// Private constructor
private TestStatements() {

}

public static String getTestSqlExample(String bootstrap, String registryUrl) {
// CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING ) " +
// "WITH ( 'connector' = 'filesystem', 'path' = '/opt/flink/data/productInventory.csv', " +
// "'format' = 'csv', 'csv.ignore-parse-errors' = 'true' )
StringBuilder builder = new StringBuilder();
builder.append("CREATE TABLE ProductInventoryTable ( product_id STRING, category STRING, stock STRING, rating STRING )");

Map<String, String> additionalProperties = new HashMap<>();
additionalProperties.put("path", "/opt/flink/data/productInventory.csv");
additionalProperties.put("format", "csv");
additionalProperties.put("csv.ignore-parse-errors", "true");

SqlWith sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
.withConnector("filesystem")
.withAdditionalProperties(additionalProperties)
.build();

String part1 = sqlWith.generateSql();

// CREATE TABLE ClickStreamTable " +
// "( user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', " +
// "WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " +
// "'topic' = 'flink.click.streams', 'properties.bootstrap.servers' = " +
// "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'click-stream-group', " +
// "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " +
// "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " +
// "'latest-offset' )
builder = new StringBuilder();
builder.append("CREATE TABLE ClickStreamTable (");
builder.append("user_id STRING, product_id STRING, `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', ");
builder.append("WATERMARK FOR event_time AS event_time - INTERVAL '1' SECOND )");

additionalProperties = new HashMap<>();
additionalProperties.put("properties.group.id", "click-stream-group");
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", registryUrl);
additionalProperties.put("scan.startup.mode", "latest-offset");

sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
.withConnector("kafka")
.withTopic("flink.click.streams")
.withBootstrapServer(bootstrap)
.withAdditionalProperties(additionalProperties)
.build();

String part2 = sqlWith.generateSql();

// CREATE TABLE SalesRecordTable ( invoice_id STRING, user_id STRING, product_id STRING, " +
// "quantity STRING, unit_cost STRING, `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', " +
// "WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND ) WITH ( 'connector' = 'kafka', " +
// "'topic' = 'flink.sales.records', 'properties.bootstrap.servers' = " +
// "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.group.id' = 'sales-record-group', " +
// "'value.format' = 'avro-confluent', 'value.avro-confluent.url' = " +
// "'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', 'scan.startup.mode' = " +
// "'latest-offset' )
builder = new StringBuilder();
builder.append("CREATE TABLE SalesRecordTable (");
builder.append("invoice_id STRING, user_id STRING, product_id STRING, quantity STRING, unit_cost STRING, ");
builder.append("`purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', ");
builder.append("WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND )");

additionalProperties = new HashMap<>();
additionalProperties.put("properties.group.id", "sales-record-group");
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", registryUrl);
additionalProperties.put("scan.startup.mode", "latest-offset");

sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
.withConnector("kafka")
.withTopic("flink.sales.records")
.withBootstrapServer(bootstrap)
.withAdditionalProperties(additionalProperties)
.build();

String part3 = sqlWith.generateSql();

// CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, " +
// "`event_time` TIMESTAMP(3), PRIMARY KEY(`user_id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', " +
// "'topic' = 'flink.recommended.products', 'properties.bootstrap.servers' = " +
// "'my-cluster-kafka-bootstrap.flink.svc:9092', 'properties.client.id' = " +
// "'recommended-products-producer-client', 'properties.transaction.timeout.ms' = '800000', " +
// "'key.format' = 'csv', 'value.format' = 'csv', 'value.fields-include' = 'ALL' )
builder = new StringBuilder();
builder.append("CREATE TABLE CsvSinkTable ( user_id STRING, top_product_ids STRING, `event_time` TIMESTAMP(3), ");
builder.append("PRIMARY KEY(`user_id`) NOT ENFORCED )");

additionalProperties = new HashMap<>();
additionalProperties.put("properties.client.id", "recommended-products-producer-client");
additionalProperties.put("properties.transaction.timeout.ms", "800000");
additionalProperties.put("key.format", "csv");
additionalProperties.put("value.format", "csv");
additionalProperties.put("value.fields-include", "ALL");

sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
.withConnector("upsert-kafka")
.withBootstrapServer(bootstrap)
.withTopic("flink.recommended.products")
.withAdditionalProperties(additionalProperties)
.build();

String part4 = sqlWith.generateSql();

// CREATE TEMPORARY VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category " +
// "FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id
builder = new StringBuilder();
builder.append("CREATE TEMPORARY VIEW clicked_products AS SELECT DISTINCT c.user_id, c.event_time, p.product_id, p.category ");
builder.append("FROM ClickStreamTable AS c JOIN ProductInventoryTable AS p ON c.product_id = p.product_id;");
String part5 = builder.toString();

// CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, " +
// "p.category, p.stock, p.rating, sr.user_id as purchased FROM clicked_products cp JOIN " +
// "ProductInventoryTable AS p ON cp.category = p.category LEFT JOIN SalesRecordTable sr ON " +
// "cp.user_id = sr.user_id AND p.product_id = sr.product_id WHERE p.stock > 0 GROUP BY p.product_id, " +
// "p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating
builder = new StringBuilder();
builder.append("CREATE TEMPORARY VIEW category_products AS SELECT cp.user_id, cp.event_time, p.product_id, ");
builder.append("p.category, p.stock, p.rating, sr.user_id as purchased ");
builder.append("FROM clicked_products cp ");
builder.append("JOIN ProductInventoryTable AS p ON cp.category = p.category ");
builder.append("LEFT JOIN SalesRecordTable sr ON cp.user_id = sr.user_id ");
builder.append("AND p.product_id = sr.product_id WHERE p.stock > 0 ");
builder.append("GROUP BY p.product_id, p.category, p.stock, cp.user_id, cp.event_time, sr.user_id, p.rating;");
String part6 = builder.toString();

// CREATE TEMPORARY VIEW " +
// "top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, cp.stock, cp.rating, " +
// "cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ORDER BY cp.purchased DESC, cp.rating DESC) " +
// "AS rn FROM category_products cp
builder = new StringBuilder();
builder.append("CREATE TEMPORARY VIEW top_products AS SELECT cp.user_id, cp.event_time, cp.product_id, cp.category, ");
builder.append("cp.stock, cp.rating, cp.purchased, ROW_NUMBER() OVER (PARTITION BY cp.user_id ");
builder.append("ORDER BY cp.purchased DESC, cp.rating DESC) AS rn ");
builder.append("FROM category_products cp;");
String part7 = builder.toString();

// INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') " +
// "AS top_product_ids, TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP " +
// "BY user_id, TUMBLE(event_time, INTERVAL '5' SECOND);
builder = new StringBuilder();
builder.append("INSERT INTO CsvSinkTable SELECT user_id, LISTAGG(product_id, ',') AS top_product_ids, ");
builder.append("TUMBLE_END(event_time, INTERVAL '5' SECOND) FROM top_products WHERE rn <= 6 GROUP BY user_id, ");
builder.append("TUMBLE(event_time, INTERVAL '5' SECOND);");
String part8 = builder.toString();

return part1 + part2 + part3 + part4 + part5 + part6 + part7 + part8;
}
}
1 change: 1 addition & 0 deletions src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ rootLogger.appenderRef.console.level = ${env:TEST_LOG_LEVEL:-INFO}
rootLogger.appenderRef.rolling.ref = RollingFile
rootLogger.appenderRef.rolling.level = DEBUG
rootLogger.additivity = false

Loading

0 comments on commit 64518f2

Please sign in to comment.