Skip to content

Commit

Permalink
Add test for bad sql statement (#53)
Browse files Browse the repository at this point in the history
Signed-off-by: David Kornel <[email protected]>
  • Loading branch information
kornys authored Oct 1, 2024
1 parent 2547421 commit eab352d
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
import io.skodjob.testframe.TestFrameConstants;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.skodjob.testframe.wait.Wait;
import io.streams.clients.kafka.StrimziKafkaClients;
import io.streams.clients.kafka.StrimziKafkaClientsBuilder;
import io.streams.e2e.Abstract;
import io.streams.operands.apicurio.templates.ApicurioRegistryTemplate;
import io.streams.operands.flink.resoruces.FlinkDeploymentType;
import io.streams.operands.flink.templates.FlinkDeploymentTemplate;
import io.streams.operands.flink.templates.FlinkRBAC;
import io.streams.operands.strimzi.resources.KafkaType;
Expand Down Expand Up @@ -45,8 +47,6 @@
@Tag(FLINK_SQL_RUNNER)
public class SqlJobRunnerST extends Abstract {

String namespace = "flink-filter";

@BeforeAll
void prepareOperators() throws IOException {
CompletableFuture.allOf(
Expand All @@ -61,6 +61,7 @@ void prepareOperators() throws IOException {
@Test
@Tag(SMOKE)
void testFlinkSqlRunnerSimpleFilter() {
String namespace = "flink-filter";
// Create namespace
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build());
Expand Down Expand Up @@ -141,4 +142,39 @@ void testFlinkSqlRunnerSimpleFilter() {
assertTrue(log.contains("\"type\":\"paypal\""));
assertFalse(log.contains("\"type\":\"creditCard\""));
}

@Test
@Tag(SMOKE)
void testBadSqlStatement() {
String namespace = "flink-bad-sql";
String flinkDeploymentName = namespace;

// Create namespace
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build());

// Add flink RBAC
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0]));

// Deploy flink with not valid sql
FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace,
flinkDeploymentName, List.of("blah blah"))
.build();
KubeResourceManager.getInstance().createOrUpdateResourceWithoutWait(flink);

// Check if no task is deployed and error is proper in flink deployment
Wait.until("Flink deployment fail", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC,
TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () ->
new FlinkDeploymentType().getClient().inNamespace(namespace).withName(flinkDeploymentName)
.get().getStatus().getError().contains("DeploymentFailedException"));

String podName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, flinkDeploymentName)
.get(0).getMetadata().getName();

Wait.until("Flink deployment contains error message", TestFrameConstants.GLOBAL_POLL_INTERVAL_1_SEC,
TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM, () ->
KubeResourceManager.getKubeClient()
.getLogsFromPod(namespace, podName).contains("SQL parse failed"));
}
}

0 comments on commit eab352d

Please sign in to comment.