diff --git a/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md b/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md index 709c56b..cc6e1d1 100644 --- a/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md +++ b/docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md @@ -37,6 +37,31 @@ * `flink` (description file doesn't exist) +## testFRocksDbStateBackend + +**Description:** Test verifies that user can use FRocksDB as state backend + +**Steps:** + +| Step | Action | Result | +| - | - | - | +| 1. | Create namespace, serviceaccount and roles for Flink | Resources created | +| 2. | Deploy Apicurio registry | Apicurio registry is up and running | +| 3. | Deploy Kafka my-cluster with scram-sha auth | Kafka is up and running | +| 4. | Create KafkaUser with scram-sha secret | KafkaUser created | +| 5. | Deploy strimzi-kafka-clients producer with payment data generator | Client job is created and data are sent to flink.payment.data topic | +| 6. | Create PVC for FlinkDeployment for FRocksDB | PVC is created | +| 7. | Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter payment of type paypal and send data to flink.payment.paypal topic, for authentication is used secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is configured to use FRocksDB as a state backend | FlinkDeployment is up and tasks are deployed and it sends filtered data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses FRockDB | +| 8. | Deploy strimzi-kafka-clients consumer as job and consume messages fromkafka topic flink.payment.paypal | Consumer is deployed and it consumes messages | +| 9. | Verify that messages are present | Messages are present | +| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as EmbeddedRocksDBStateBackend' | Log message is present | + +**Labels:** + +* `flink-sql-runner` (description file doesn't exist) +* `flink` (description file doesn't exist) + + ## testSimpleFilter **Description:** Test verifies sql-runner.jar works integrated with kafka, apicurio and uses scram-sha for kafka authentication diff --git a/src/main/java/io/streams/operands/apicurio/templates/ApicurioRegistryTemplate.java b/src/main/java/io/streams/operands/apicurio/templates/ApicurioRegistryTemplate.java index 2b88fd6..ac8577f 100644 --- a/src/main/java/io/streams/operands/apicurio/templates/ApicurioRegistryTemplate.java +++ b/src/main/java/io/streams/operands/apicurio/templates/ApicurioRegistryTemplate.java @@ -32,7 +32,7 @@ public static ApicurioRegistryBuilder defaultApicurioRegistry(String name, Strin .endSpec(); } - public static KafkaTopic apicurioKsqlTopic(String namespace, String kafkaClusterName) { + public static KafkaTopic apicurioKsqlTopic(String namespace, String kafkaClusterName, int replicas) { return new KafkaTopicBuilder() .withNewMetadata() .withNamespace(namespace) @@ -40,8 +40,8 @@ public static KafkaTopic apicurioKsqlTopic(String namespace, String kafkaCluster .addToLabels(ResourceLabels.STRIMZI_CLUSTER_LABEL, kafkaClusterName) .endMetadata() .withNewSpec() - .withPartitions(1) - .withReplicas(1) + .withPartitions(replicas) + .withReplicas(replicas) .withConfig(Map.of( "cleanup.policy", "compact" )) diff --git a/src/main/java/io/streams/operators/manifests/CertManagerManifestInstaller.java b/src/main/java/io/streams/operators/manifests/CertManagerManifestInstaller.java index 0e71e37..e5ac42b 100644 --- a/src/main/java/io/streams/operators/manifests/CertManagerManifestInstaller.java +++ b/src/main/java/io/streams/operators/manifests/CertManagerManifestInstaller.java @@ -92,7 +92,7 @@ private static boolean isReady() { .deployments().inNamespace(OPERATOR_NS).withName(WEBHOOK_DEPLOYMENT_NAME).isReady() && KubeResourceManager.getKubeClient().getClient().apps() .deployments().inNamespace(OPERATOR_NS).withName(CA_INJECTION_DEPLOYMENT_NAME).isReady()) { - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(60)); + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120)); LOGGER.info("Cert-manager {}/{} is ready", OPERATOR_NS, DEPLOYMENT_NAME); return true; } else { diff --git a/src/main/java/io/streams/operators/olm/catalog/CertManagerOlmCatalogInstaller.java b/src/main/java/io/streams/operators/olm/catalog/CertManagerOlmCatalogInstaller.java index 64f3817..aad41d1 100644 --- a/src/main/java/io/streams/operators/olm/catalog/CertManagerOlmCatalogInstaller.java +++ b/src/main/java/io/streams/operators/olm/catalog/CertManagerOlmCatalogInstaller.java @@ -107,7 +107,7 @@ private static boolean isOperatorReady(String ns) { try { PodUtils.waitForPodsReadyWithRestart(ns, new LabelSelectorBuilder() .withMatchLabels(Map.of("app.kubernetes.io/instance", "cert-manager")).build(), 3, true); - LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(60)); + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120)); LOGGER.info("Cert-manager operator in namespace {} is ready", ns); return true; } catch (Exception ex) { diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java b/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java index bbcdea9..105b3e5 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java @@ -127,7 +127,7 @@ void testRecommendationApp() throws IOException { // Create topic for ksql apicurio KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, "my-cluster")); + ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, "my-cluster", 1)); String bootstrapServer = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get() .getStatus().getListeners().stream().filter(l -> l.getName().equals("plain")) diff --git a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java index 44010ff..2e29bcd 100644 --- a/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java +++ b/src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java @@ -7,6 +7,10 @@ import io.apicurio.registry.serde.avro.AvroKafkaSerializer; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.NamespaceBuilder; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaim; +import io.fabric8.kubernetes.api.model.PersistentVolumeClaimBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.Quantity; import io.skodjob.annotations.Desc; import io.skodjob.annotations.Label; import io.skodjob.annotations.Step; @@ -42,7 +46,9 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.List; +import java.util.Map; import static io.streams.constants.TestTags.FLINK; import static io.streams.constants.TestTags.FLINK_SQL_RUNNER; @@ -66,6 +72,7 @@ } ) public class SqlJobRunnerST extends Abstract { + final String kafkaClusterName = "my-cluster"; @BeforeAll void prepareOperators() throws Exception { @@ -114,10 +121,10 @@ void testSimpleFilter() { // Create kafka KubeResourceManager.getInstance().createOrUpdateResourceWithWait( KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", - 3, "my-cluster", List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)).build()); + 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)).build()); KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaTemplate.defaultKafka(namespace, "my-cluster") + KafkaTemplate.defaultKafka(namespace, kafkaClusterName) .editSpec() .editKafka() .withListeners( @@ -141,20 +148,20 @@ void testSimpleFilter() { // Create topic for ksql apicurio KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, "my-cluster")); + ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); // Create kafka scram sha user KubeResourceManager.getInstance().createOrUpdateResourceWithWait( - KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, "my-cluster") + KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) .editSpec() .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) .endSpec() .build()); - String bootstrapServerAuth = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get() + String bootstrapServerAuth = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() .getStatus().getListeners().stream().filter(l -> l.getName().equals("plain")) .findFirst().get().getBootstrapServers(); - String bootstrapServerUnsecure = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get() + String bootstrapServerUnsecure = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() .getStatus().getListeners().stream().filter(l -> l.getName().equals("unsecure")) .findFirst().get().getBootstrapServers(); @@ -181,7 +188,7 @@ void testSimpleFilter() { .withMessageTemplate("payment_fiat") .withAdditionalConfig( StrimziClientUtils.getApicurioAdditionalProperties(AvroKafkaSerializer.class.getName(), - "http://apicurio-registry-service.flink-filter.svc:8080/apis/registry/v2") + "\n" + "http://apicurio-registry-service." + namespace + ".svc:8080/apis/registry/v2") + "\n" + "sasl.mechanism=SCRAM-SHA-512\n" + "security.protocol=SASL_PLAINTEXT\n" + "sasl.jaas.config=" + saslJaasConfigDecrypted @@ -192,7 +199,7 @@ void testSimpleFilter() { kafkaProducerClient.producerStrimzi() ); - String registryUrl = "http://apicurio-registry-service.flink-filter.svc:8080/apis/ccompat/v6"; + String registryUrl = "http://apicurio-registry-service." + namespace + ".svc:8080/apis/ccompat/v6"; // Deploy flink with test filter sql statement which filter to specific topic only payment type paypal FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, @@ -328,4 +335,225 @@ void testWrongConnectionInfo() { .getLogsFromPod(namespace, podName) .contains("No resolvable bootstrap urls given in bootstrap.servers")); } + + @TestDoc( + description = @Desc("Test verifies that user can use FRocksDB as state backend"), + steps = { + @Step(value = "Create namespace, serviceaccount and roles for Flink", expected = "Resources created"), + @Step(value = "Deploy Apicurio registry", expected = "Apicurio registry is up and running"), + @Step(value = "Deploy Kafka my-cluster with scram-sha auth", expected = "Kafka is up and running"), + @Step(value = "Create KafkaUser with scram-sha secret", expected = "KafkaUser created"), + @Step(value = "Deploy strimzi-kafka-clients producer with payment data generator", + expected = "Client job is created and data are sent to flink.payment.data topic"), + @Step(value = "Create PVC for FlinkDeployment for FRocksDB", expected = "PVC is created"), + @Step(value = "Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter " + + "payment of type paypal and send data to flink.payment.paypal topic, for authentication is used " + + "secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is " + + "configured to use FRocksDB as a state backend", + expected = "FlinkDeployment is up and tasks are deployed and it sends filtered " + + "data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses " + + "FRockDB"), + @Step(value = "Deploy strimzi-kafka-clients consumer as job and consume messages from" + + "kafka topic flink.payment.paypal", + expected = "Consumer is deployed and it consumes messages"), + @Step(value = "Verify that messages are present", expected = "Messages are present"), + @Step(value = "Verify that taskmanager logs contains 'State backend loader loads the state " + + "backend as EmbeddedRocksDBStateBackend'", expected = "Log message is present"), + }, + labels = { + @Label(value = FLINK_SQL_RUNNER), + @Label(value = FLINK), + } + ) + @Test + void testFRocksDbStateBackend() { + String namespace = "flink-state-backend"; + String kafkaUser = "test-user"; + // Create namespace + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build()); + + // Add flink RBAC + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + FlinkRBAC.getFlinkRbacResources(namespace).toArray(new HasMetadata[0])); + + // Create kafka + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + KafkaNodePoolTemplate.defaultKafkaNodePoolJbod(namespace, "dual-role", + 3, kafkaClusterName, List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)).build()); + + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + KafkaTemplate.defaultKafka(namespace, kafkaClusterName) + .editSpec() + .editKafka() + .withListeners( + new GenericKafkaListenerBuilder() + .withName("plain") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9092)) + .withAuth(new KafkaListenerAuthenticationScramSha512()) + .build(), + new GenericKafkaListenerBuilder() + .withName("unsecure") + .withTls(false) + .withType(KafkaListenerType.INTERNAL) + .withPort((9094)) + .build() + ) + .endKafka() + .endSpec() + .build()); + + // Create topic for ksql apicurio + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, kafkaClusterName, 3)); + + // Create kafka scram sha user + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, kafkaClusterName) + .editSpec() + .withAuthentication(new KafkaUserScramSha512ClientAuthentication()) + .endSpec() + .build()); + + String bootstrapServerAuth = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() + .getStatus().getListeners().stream().filter(l -> l.getName().equals("plain")) + .findFirst().get().getBootstrapServers(); + String bootstrapServerUnsecure = KafkaType.kafkaClient().inNamespace(namespace).withName(kafkaClusterName).get() + .getStatus().getListeners().stream().filter(l -> l.getName().equals("unsecure")) + .findFirst().get().getBootstrapServers(); + + // Add apicurio + KubeResourceManager.getInstance().createOrUpdateResourceWithWait( + ApicurioRegistryTemplate.defaultApicurioRegistry("apicurio-registry", namespace, + bootstrapServerUnsecure).build()); + + // Get user secret jaas configuration + final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient().getClient().secrets() + .inNamespace(namespace).withName(kafkaUser).get().getData().get("sasl.jaas.config"); + final String saslJaasConfigDecrypted = TestUtils.decodeFromBase64(saslJaasConfigEncrypted); + + // Run internal producer and produce data + String producerName = "kafka-producer"; + StrimziKafkaClients kafkaProducerClient = new StrimziKafkaClientsBuilder() + .withProducerName(producerName) + .withNamespaceName(namespace) + .withTopicName("flink.payment.data") + .withBootstrapAddress(bootstrapServerAuth) + .withMessageCount(10000) + .withUsername(kafkaUser) + .withDelayMs(10) + .withMessageTemplate("payment_fiat") + .withAdditionalConfig( + StrimziClientUtils.getApicurioAdditionalProperties(AvroKafkaSerializer.class.getName(), + "http://apicurio-registry-service." + namespace + ".svc:8080/apis/registry/v2") + "\n" + + "sasl.mechanism=SCRAM-SHA-512\n" + + "security.protocol=SASL_PLAINTEXT\n" + + "sasl.jaas.config=" + saslJaasConfigDecrypted + ) + .build(); + + KubeResourceManager.getInstance().createResourceWithWait( + kafkaProducerClient.producerStrimzi() + ); + + String registryUrl = "http://apicurio-registry-service." + namespace + ".svc:8080/apis/ccompat/v6"; + + // Create PVC for flink + PersistentVolumeClaim flinkPVC = new PersistentVolumeClaimBuilder() + .withNewMetadata() + .withName("flink-state-backend") + .withNamespace(namespace) + .endMetadata() + .withNewSpec() + .withAccessModes("ReadWriteOnce") + .withNewResources() + .withRequests(Collections.singletonMap( + "storage", + new Quantity("100Gi"))) + .endResources() + .endSpec() + .build(); + KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flinkPVC); + + // Deploy flink with test filter sql statement which filter to specific topic only payment type paypal + // Modify flink default deployment with state backend and pvc configuration + FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace, + "flink-state-backend", List.of(TestStatements.getTestFlinkFilter( + bootstrapServerAuth, registryUrl, kafkaUser, namespace))) + .editSpec() + .addToFlinkConfiguration( + Map.of( + "execution.checkpointing.interval", "60000", + "execution.checkpointing.snapshot-compression", "true", + "kubernetes.operator.job.restart.failed", "true", + "state.backend.rocksdb.compression.per.level_FLINK_JIRA", "SNAPPY_COMPRESSION", + "state.backend.type", "rocksdb", + "state.checkpoints.dir", "file:///flink-state-store/checkpoints", + "state.savepoints.dir", "file:///flink-state-store/savepoints" + ) + ) + .editPodTemplate() + .editOrNewSpec() + .editFirstContainer() + .addNewVolumeMount() + .withMountPath("/flink-state-store") + .withName("flink-state-store") + .endFlinkdeploymentspecVolumeMount() + .endFlinkdeploymentspecContainer() + .addNewVolume() + .withName("flink-state-store") + .withNewPersistentVolumeClaim() + .withClaimName(flinkPVC.getMetadata().getName()) + .endFlinkdeploymentspecPersistentVolumeClaim() + .endFlinkdeploymentspecVolume() + .endFlinkdeploymentspecSpec() + .endPodTemplate() + .endSpec() + .build(); + KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flink); + + JobUtils.waitForJobSuccess(namespace, kafkaProducerClient.getProducerName(), + TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); + + //Check task manager log for presence rocksbd configuration + Wait.until("Task manager contains info about rocksdb", TestFrameConstants.GLOBAL_POLL_INTERVAL_LONG, + TestFrameConstants.GLOBAL_TIMEOUT, () -> { + List taskManagerPods = KubeResourceManager.getKubeClient() + .listPodsByPrefixInName(namespace, namespace + "-taskmanager"); + for (Pod p : taskManagerPods) { + return KubeResourceManager.getKubeClient().getLogsFromPod(namespace, p.getMetadata().getName()) + .contains("State backend loader loads the state backend as EmbeddedRocksDBStateBackend"); + } + return false; + }); + + // Run consumer and check if data are filtered + String consumerName = "kafka-consumer"; + StrimziKafkaClients kafkaConsumerClient = new StrimziKafkaClientsBuilder() + .withConsumerName(consumerName) + .withNamespaceName(namespace) + .withTopicName("flink.payment.paypal") + .withBootstrapAddress(bootstrapServerAuth) + .withMessageCount(10) + .withAdditionalConfig( + "sasl.mechanism=SCRAM-SHA-512\n" + + "security.protocol=SASL_PLAINTEXT\n" + + "sasl.jaas.config=" + saslJaasConfigDecrypted + ) + .withConsumerGroup("flink-filter-test-group").build(); + + KubeResourceManager.getInstance().createResourceWithWait( + kafkaConsumerClient.consumerStrimzi() + ); + + JobUtils.waitForJobSuccess(namespace, kafkaConsumerClient.getConsumerName(), + TestFrameConstants.GLOBAL_TIMEOUT_MEDIUM); + String consumerPodName = KubeResourceManager.getKubeClient().listPodsByPrefixInName(namespace, consumerName) + .get(0).getMetadata().getName(); + String log = KubeResourceManager.getKubeClient().getLogsFromPod(namespace, consumerPodName); + assertTrue(log.contains("\"type\":\"paypal\"")); + assertFalse(log.contains("\"type\":\"creditCard\"")); + } }