Skip to content

Commit

Permalink
Add test for use FRocksDB as a state backend (#71)
Browse files Browse the repository at this point in the history
Signed-off-by: David Kornel <[email protected]>
  • Loading branch information
kornys authored Oct 15, 2024
1 parent e93c3b8 commit 69d6dd1
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 14 deletions.
25 changes: 25 additions & 0 deletions docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,31 @@
* `flink` (description file doesn't exist)


## testFRocksDbStateBackend

**Description:** Test verifies that user can use FRocksDB as state backend

**Steps:**

| Step | Action | Result |
| - | - | - |
| 1. | Create namespace, serviceaccount and roles for Flink | Resources created |
| 2. | Deploy Apicurio registry | Apicurio registry is up and running |
| 3. | Deploy Kafka my-cluster with scram-sha auth | Kafka is up and running |
| 4. | Create KafkaUser with scram-sha secret | KafkaUser created |
| 5. | Deploy strimzi-kafka-clients producer with payment data generator | Client job is created and data are sent to flink.payment.data topic |
| 6. | Create PVC for FlinkDeployment for FRocksDB | PVC is created |
| 7. | Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter payment of type paypal and send data to flink.payment.paypal topic, for authentication is used secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is configured to use FRocksDB as a state backend | FlinkDeployment is up and tasks are deployed and it sends filtered data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses FRockDB |
| 8. | Deploy strimzi-kafka-clients consumer as job and consume messages fromkafka topic flink.payment.paypal | Consumer is deployed and it consumes messages |
| 9. | Verify that messages are present | Messages are present |
| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as EmbeddedRocksDBStateBackend' | Log message is present |

**Labels:**

* `flink-sql-runner` (description file doesn't exist)
* `flink` (description file doesn't exist)


## testSimpleFilter

**Description:** Test verifies sql-runner.jar works integrated with kafka, apicurio and uses scram-sha for kafka authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ public static ApicurioRegistryBuilder defaultApicurioRegistry(String name, Strin
.endSpec();
}

public static KafkaTopic apicurioKsqlTopic(String namespace, String kafkaClusterName) {
public static KafkaTopic apicurioKsqlTopic(String namespace, String kafkaClusterName, int replicas) {
return new KafkaTopicBuilder()
.withNewMetadata()
.withNamespace(namespace)
.withName("kafkasql-journal")
.addToLabels(ResourceLabels.STRIMZI_CLUSTER_LABEL, kafkaClusterName)
.endMetadata()
.withNewSpec()
.withPartitions(1)
.withReplicas(1)
.withPartitions(replicas)
.withReplicas(replicas)
.withConfig(Map.of(
"cleanup.policy", "compact"
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private static boolean isReady() {
.deployments().inNamespace(OPERATOR_NS).withName(WEBHOOK_DEPLOYMENT_NAME).isReady() &&
KubeResourceManager.getKubeClient().getClient().apps()
.deployments().inNamespace(OPERATOR_NS).withName(CA_INJECTION_DEPLOYMENT_NAME).isReady()) {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(60));
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120));
LOGGER.info("Cert-manager {}/{} is ready", OPERATOR_NS, DEPLOYMENT_NAME);
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private static boolean isOperatorReady(String ns) {
try {
PodUtils.waitForPodsReadyWithRestart(ns, new LabelSelectorBuilder()
.withMatchLabels(Map.of("app.kubernetes.io/instance", "cert-manager")).build(), 3, true);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(60));
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(120));
LOGGER.info("Cert-manager operator in namespace {} is ready", ns);
return true;
} catch (Exception ex) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/streams/e2e/flink/sql/SqlExampleST.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void testRecommendationApp() throws IOException {

// Create topic for ksql apicurio
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, "my-cluster"));
ApicurioRegistryTemplate.apicurioKsqlTopic(namespace, "my-cluster", 1));

String bootstrapServer = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get()
.getStatus().getListeners().stream().filter(l -> l.getName().equals("plain"))
Expand Down
Loading

0 comments on commit 69d6dd1

Please sign in to comment.