diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java index ea4af4e..f66f37b 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java @@ -9,10 +9,12 @@ import io.fabric8.kubernetes.api.model.NamespaceBuilder; import io.skodjob.testframe.TestFrameConstants; import io.skodjob.testframe.resources.KubeResourceManager; +import io.skodjob.testframe.wait.Wait; import io.streams.clients.kafka.StrimziKafkaClients; import io.streams.clients.kafka.StrimziKafkaClientsBuilder; import io.streams.e2e.Abstract; import io.streams.operands.apicurio.templates.ApicurioRegistryTemplate; +import io.streams.operands.flink.resoruces.FlinkDeploymentType; import io.streams.operands.flink.templates.FlinkDeploymentTemplate; import io.streams.operands.flink.templates.FlinkRBAC; import io.streams.operands.strimzi.resources.KafkaType; @@ -45,8 +47,6 @@ @Tag(FLINK_SQL_RUNNER) public class SqlJobRunnerST extends Abstract { - String namespace = "flink-filter"; - @BeforeAll void prepareOperators() throws IOException { CompletableFuture.allOf( @@ -61,6 +61,7 @@ void prepareOperators() throws IOException { @Test @Tag(SMOKE) void testFlinkSqlRunnerSimpleFilter() { + String namespace = "flink-filter"; // Create namespace KubeResourceManager.getInstance().createOrUpdateResourceWithWait( new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); @@ -141,4 +142,39 @@ void testFlinkSqlRunnerSimpleFilter() { assertTrue(log.contains("\"type\":\"paypal\"")); assertFalse(log.contains("\"type\":\"creditCard\"")); } + + @Test + @Tag(SMOKE) + void testBadSqlStatement() { + String namespace = "flink-bad-sql"; + String flinkDeploymentName = namespace; + + // Create namespace + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); + + // Add flink RBAC + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0])); + + // Deploy flink with not valid sql + FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, + flinkDeploymentName, List.of("blah blah")) + .build(); + KubeResourceManager.getInstance().createOrUpdateResourceWithoutWait(flink); + + // Check if no task is deployed and error is proper in flink deployment + Wait.until("Flink deployment fail", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC, + TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () -> + new FlinkDeploymentType().getClient().inNamespace(namespace).withName(flinkDeploymentName) + .get().getStatus().getError().contains("DeploymentFailedException")); + + String podName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, flinkDeploymentName) + .get(0).getMetadata().getName(); + + Wait.until("Flink deployment contains error message", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC, + TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () -> + KubeResourceManager.getKubeClient() + .getLogsFromPod(namespace, podName).contains("SQL parse failed")); + } }