Skip to content

Commit

Permalink
Add test that uses s3 (Minio) as a state backend (#88)
Browse files Browse the repository at this point in the history
* Add test that uses s3 (Minio) as a state backend

Signed-off-by: Jakub Stejskal <[email protected]>

* Adjust config to make it more resilient

Signed-off-by: Jakub Stejskal <[email protected]>

* Some tries to fix TF

Signed-off-by: Jakub Stejskal <[email protected]>

* Debug v1

Signed-off-by: Jakub Stejskal <[email protected]>

* Use earliest-offset for Flink-kafka client

Signed-off-by: Jakub Stejskal <[email protected]>

* Revert formating

Signed-off-by: Jakub Stejskal <[email protected]>

* Rename SetupMinio to MinioInstaller

Signed-off-by: Jakub Stejskal <[email protected]>

---------

Signed-off-by: Jakub Stejskal <[email protected]>
  • Loading branch information
Frawless authored Nov 5, 2024
1 parent a607e02 commit 03921a6
Show file tree
Hide file tree
Showing 8 changed files with 605 additions and 8 deletions.
26 changes: 26 additions & 0 deletions docs/io/streams/e2e/flink/sql/SqlJobRunnerST.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,32 @@
* `flink` (description file doesn't exist)


## testS3StateBackend

**Description:** Test verifies that user can use S3 as state backend

**Steps:**

| Step | Action | Result |
| - | - | - |
| 1. | Create namespace, serviceaccount and roles for Flink | Resources created |
| 2. | Deploy Apicurio registry | Apicurio registry is up and running |
| 3. | Deploy Kafka my-cluster with scram-sha auth | Kafka is up and running |
| 4. | Create KafkaUser with scram-sha secret | KafkaUser created |
| 5. | Deploy strimzi-kafka-clients producer with payment data generator | Client job is created and data are sent to flink.payment.data topic |
| 6. | Deploy Minio for S3 service | Minio is up and running |
| 7. | Deploy FlinkDeployment with sql which gets data from flink.payment.data topic filter payment of type paypal and send data to flink.payment.paypal topic, for authentication is used secret created by KafkaUser and this secret is passed into by secret interpolation. Flink is configured to use S3 as a state backend | FlinkDeployment is up and tasks are deployed and it sends filtered data into flink.payment.paypal topic, task manager deployed by FlinkDeployment uses S3 |
| 8. | Deploy strimzi-kafka-clients consumer as job and consume messages fromkafka topic flink.payment.paypal | Consumer is deployed and it consumes messages |
| 9. | Verify that messages are present | Messages are present |
| 10. | Verify that taskmanager logs contains 'State backend loader loads the state backend as HashMapStateBackend' | Log message is present |
| 11. | Verify that Minio contains some data from Flink | Flink bucket is not empty |

**Labels:**

* `flink-sql-runner` (description file doesn't exist)
* `flink` (description file doesn't exist)


## testSimpleFilter

**Description:** Test verifies sql-runner.jar works integrated with kafka, apicurio and uses scram-sha for kafka authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public JobBuilder defaultProducerStrimzi() {
.endEnv()
.addNewEnv()
.withName("LOG_LEVEL")
.withValue("DEBUG")
.withValue("INFO")
.endEnv()
.addNewEnv()
.withName("MESSAGE_COUNT")
Expand Down Expand Up @@ -329,7 +329,7 @@ public JobBuilder defaultConsumerStrimzi() {
.endEnv()
.addNewEnv()
.withName("LOG_LEVEL")
.withValue("DEBUG")
.withValue("INFO")
.endEnv()
.addNewEnv()
.withName("MESSAGE_COUNT")
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/streams/constants/TestConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ public interface TestConstants {

String STRIMZI_TEST_CLIENTS_LABEL_KEY = "strimzi-test-clients";
String STRIMZI_TEST_CLIENTS_LABEL_VALUE = "true";

// Labels
String APP_POD_LABEL = "app";
String DEPLOYMENT_TYPE = "deployment-type";
}
142 changes: 142 additions & 0 deletions src/main/java/io/streams/operands/minio/MinioInstaller.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright streamshub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streams.operands.minio;

import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServiceBuilder;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.streams.constants.TestConstants;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;

public class MinioInstaller {
private static final Logger LOGGER = LogManager.getLogger(MinioInstaller.class);

public static final String MINIO = "minio";
public static final String ADMIN_CREDS = "minioadminLongerThan16BytesForFIPS";
public static final String MINIO_STORAGE_ALIAS = "local";
public static final int MINIO_PORT = 9000;
public static final int MINIO_CONSOLE_PORT = 9090;
private static final String MINIO_IMAGE = "quay.io/minio/minio:latest";

/**
* Deploy minio to a specific namespace, creates service for it and init client inside the Minio pod
* @param namespace where Minio will be installed to
*/
public static void deployMinio(String namespace) {
// Create a Minio deployment
Deployment minioDeployment = new DeploymentBuilder()
.withNewMetadata()
.withName(MINIO)
.withNamespace(namespace)
.withLabels(Map.of(TestConstants.DEPLOYMENT_TYPE, MINIO))
.endMetadata()
.withNewSpec()
.withReplicas(1)
.withNewSelector()
.withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO))
.endSelector()
.withNewTemplate()
.withNewMetadata()
.withLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO))
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(MINIO)
.withImage(MINIO_IMAGE)
.withArgs("server", "/data", "--console-address", ":" + MINIO_CONSOLE_PORT)
.addToEnv(new EnvVar("MINIO_ROOT_USER", ADMIN_CREDS, null))
.addToEnv(new EnvVar("MINIO_ROOT_PASSWORD", ADMIN_CREDS, null))
.addNewPort()
.withContainerPort(MINIO_PORT)
.endPort()
.withVolumeMounts(new VolumeMountBuilder()
.withName("minio-storage")
.withMountPath("/data")
.build())
.endContainer()
.withVolumes(new VolumeBuilder()
.withName("minio-storage")
.withNewEmptyDir()
.endEmptyDir()
.build())
.endSpec()
.endTemplate()
.endSpec()
.build();

// Create the deployment
KubeResourceManager.getInstance().createResourceWithWait(minioDeployment);

// Create a service to expose Minio
Service minioService = new ServiceBuilder()
.withNewMetadata()
.withName(MINIO)
.withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withSelector(Map.of(TestConstants.APP_POD_LABEL, MINIO))
.addNewPort()
.withName("api")
.withPort(MINIO_PORT)
.withTargetPort(new IntOrString(MINIO_PORT))
.endPort()
.addNewPort()
.withName("console")
.withPort(MINIO_CONSOLE_PORT)
.withTargetPort(new IntOrString(MINIO_CONSOLE_PORT))
.endPort()
.endSpec()
.build();

KubeResourceManager.getInstance().createResourceWithoutWait(minioService);
// NetworkPolicyResource.allowNetworkPolicyAllIngressForMatchingLabel(namespace, MINIO, Map.of(TestConstants.APP_POD_LABEL, MINIO));

initMinioClient(namespace);
}

/**
* Init client inside the Minio pod. This allows other commands to be executed during the tests.
* @param namespace where Minio is installed
*/
private static void initMinioClient(String namespace) {
final LabelSelector labelSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO)).build();
final String minioPod = KubeResourceManager.getKubeClient().listPods(namespace, labelSelector).get(0).getMetadata().getName();

KubeResourceManager.getKubeCmdClient().inNamespace(namespace).execInPod(minioPod,
"mc",
"config",
"host",
"add",
MINIO_STORAGE_ALIAS,
"http://localhost:" + MINIO_PORT,
ADMIN_CREDS, ADMIN_CREDS);
}

/**
* Create bucket in Minio instance in specific namespace.
* @param namespace Minio location
* @param bucketName name of the bucket that will be created and used within the tests
*/
public static void createBucket(String namespace, String bucketName) {
final LabelSelector labelSelector = new LabelSelectorBuilder().withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MINIO)).build();
final String minioPod = KubeResourceManager.getKubeClient().listPods(namespace, labelSelector).get(0).getMetadata().getName();

KubeResourceManager.getKubeCmdClient().inNamespace(namespace).execInPod(minioPod,
"mc",
"mb",
MINIO_STORAGE_ALIAS + "/" + bucketName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static KafkaNodePoolBuilder defaultKafkaNodePoolPvc(String namespace, Str
.withReplicas(replicas)
.addAllToRoles(roles)
.withNewPersistentClaimStorage()
.withSize("1Gi")
.withSize("5Gi")
.withDeleteClaim(true)
.endPersistentClaimStorage()
.endSpec();
Expand All @@ -46,7 +46,7 @@ public static KafkaNodePoolBuilder defaultKafkaNodePoolJbod(String namespace, St
.addAllToRoles(roles)
.withStorage(
new JbodStorageBuilder().addToVolumes(
new PersistentClaimStorageBuilder().withId(0).withSize("1Gi").withDeleteClaim(true)
new PersistentClaimStorageBuilder().withId(0).withSize("5Gi").withDeleteClaim(true)
.withKraftMetadata(KRaftMetadataStorage.SHARED).build())
.build())
.endSpec();
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/io/streams/sql/TestStatements.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public static String getTestFlinkFilter(String bootstrap, String registryUrl, St
additionalProperties.put("properties.group.id", "flink-filter-group");
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", registryUrl);
additionalProperties.put("scan.startup.mode", "latest-offset");
// Startup mode for Kafka consumer, we set earliest to catch even previously sent messages
additionalProperties.put("scan.startup.mode", "earliest-offset");
additionalProperties.put("properties.security.protocol", "SASL_PLAINTEXT");
additionalProperties.put("properties.sasl.mechanism", "SCRAM-SHA-512");
additionalProperties.put("properties.sasl.jaas.config",
Expand Down Expand Up @@ -239,7 +240,8 @@ public static String getWrongConnectionSql() {
additionalProperties.put("properties.group.id", "flink-filter-group");
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", "not-exists-sr.cluster.local:5001");
additionalProperties.put("scan.startup.mode", "latest-offset");
// Startup mode for Kafka consumer, we set earliest to catch even previously sent messages
additionalProperties.put("scan.startup.mode", "earliest-offset");

SqlWith sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
Expand Down
147 changes: 147 additions & 0 deletions src/main/java/io/streams/utils/MinioUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright streamshub authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.streams.utils;

import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.skodjob.testframe.TestFrameConstants;
import io.skodjob.testframe.resources.KubeResourceManager;
import io.skodjob.testframe.wait.Wait;
import io.streams.constants.TestConstants;
import io.streams.operands.minio.MinioInstaller;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class MinioUtils {
private static final Logger LOGGER = LogManager.getLogger(MinioUtils.class);

private MinioUtils() {

}

/**
* Collect data from Minio about usage of a specific bucket
*
* @param namespace Name of the Namespace where the Minio Pod is running
* @param bucketName Name of the bucket for which we want to get info about its size
* @return Overall statistics about the bucket in String format
*/
public static String getBucketSizeInfo(String namespace, String bucketName) {
final LabelSelector labelSelector = new LabelSelectorBuilder()
.withMatchLabels(Map.of(TestConstants.APP_POD_LABEL, MinioInstaller.MINIO))
.build();
final String minioPod = KubeResourceManager.getKubeClient()
.listPods(namespace, labelSelector)
.get(0)
.getMetadata()
.getName();

return KubeResourceManager.getKubeCmdClient()
.inNamespace(namespace)
.execInPod(minioPod,
"mc",
"stat",
"local/" + bucketName)
.out();
}

/**
* Parse out total size of bucket from the information about usage.
*
* @param bucketInfo String containing all stat info about bucket
* @return Map consists of parsed size and it's unit
*/
private static Map<String, Object> parseTotalSize(String bucketInfo) {
Pattern pattern = Pattern.compile("Total size:\\s*(?<size>[\\d.]+)\\s*(?<unit>.*)");
Matcher matcher = pattern.matcher(bucketInfo);

if (matcher.find()) {
return Map.of("size", Double.parseDouble(matcher.group("size")), "unit", matcher.group("unit"));
} else {
throw new IllegalArgumentException("Total size not found in the provided string");
}
}

/**
* Parse out total size of bucket from the information about usage.
*
* @param bucketInfo String containing all stat info about bucket
* @return Object counts in the bucket
*/
private static int parseObjectCount(String bucketInfo) {
Pattern pattern = Pattern.compile("Objects count:\\s*(?<count>[\\d.]+)");
Matcher matcher = pattern.matcher(bucketInfo);

if (matcher.find()) {
return Integer.parseInt(matcher.group("count"));
} else {
throw new IllegalArgumentException("Objects count not found in the provided string");
}
}

/**
* Wait until size of the bucket is not 0 B.
*
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForDataInMinio(String namespace, String bucketName) {
Wait.until("data sync from Kafka to Minio",
TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM,
TestFrameConstants.GLOBAL_TIMEOUT,
() -> {
String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName);
Map<String, Object> parsedSize = parseTotalSize(bucketSizeInfo);
double bucketSize = (Double) parsedSize.get("size");
LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit"));
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return bucketSize > 0;
});
}

/**
* Wait until size of the bucket is not 0 B.
*
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForObjectsInMinio(String namespace, String bucketName) {
Wait.until("data sync from Kafka to Minio",
TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM,
TestFrameConstants.GLOBAL_TIMEOUT,
() -> {
String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName);
int objectCount = parseObjectCount(bucketSizeInfo);
LOGGER.info("Collected object count: {}", objectCount);
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return objectCount > 0;
});
}


/**
* Wait until bucket is empty.
*
* @param namespace Minio location
* @param bucketName bucket name
*/
public static void waitForNoDataInMinio(String namespace, String bucketName) {
Wait.until("data deletion in Minio", TestFrameConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestFrameConstants.GLOBAL_TIMEOUT, () -> {
String bucketSizeInfo = getBucketSizeInfo(namespace, bucketName);
Map<String, Object> parsedSize = parseTotalSize(bucketSizeInfo);
double bucketSize = (Double) parsedSize.get("size");
LOGGER.info("Collected bucket size: {} {}", bucketSize, parsedSize.get("unit"));
LOGGER.debug("Collected bucket info:\n{}", bucketSizeInfo);

return bucketSize == 0;
});
}
}
Loading

0 comments on commit 03921a6

Please sign in to comment.