Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cover basic secret interpolation in flink sql #54

Merged
merged 4 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public static List<HasMetadata> getFlinkRbacResources(String namespace) {
"deployments/finalizers"
)
.withVerbs("*")
.build(),
new PolicyRuleBuilder()
.withApiGroups("")
.withResources(
"secrets"
)
.withVerbs("get", "list")
.build()
)
.build(),
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/io/streams/sql/TestStatements.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static String getTestSqlExample(String bootstrap, String registryUrl) {
return part1 + part2 + part3 + part4 + part5 + part6 + part7 + part8;
}

public static String getTestFlinkFilter(String bootstrap, String registryUrl) {
public static String getTestFlinkFilter(String bootstrap, String registryUrl, String kafkaUser, String namespace) {
StringBuilder builder = new StringBuilder();
builder.append("CREATE TABLE payment_fiat (paymentDetails ROW<transactionId STRING, type STRING, " +
"amount DOUBLE, currency STRING, `date` STRING, status STRING>, payer ROW<name STRING, payerType STRING, " +
Expand All @@ -178,6 +178,11 @@ public static String getTestFlinkFilter(String bootstrap, String registryUrl) {
additionalProperties.put("value.format", "avro-confluent");
additionalProperties.put("value.avro-confluent.url", registryUrl);
additionalProperties.put("scan.startup.mode", "latest-offset");
additionalProperties.put("properties.security.protocol", "SASL_PLAINTEXT");
additionalProperties.put("properties.sasl.mechanism", "SCRAM-SHA-512");
additionalProperties.put("properties.sasl.jaas.config",
"org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=" + kafkaUser + " password={{secret:" + namespace + "/" + kafkaUser + "/password}}%;");

SqlWith sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
Expand All @@ -199,6 +204,11 @@ public static String getTestFlinkFilter(String bootstrap, String registryUrl) {
additionalProperties.put("key.fields", "transactionId");
additionalProperties.put("value.format", "json");
additionalProperties.put("value.fields-include", "ALL");
additionalProperties.put("properties.security.protocol", "SASL_PLAINTEXT");
additionalProperties.put("properties.sasl.mechanism", "SCRAM-SHA-512");
additionalProperties.put("properties.sasl.jaas.config",
"org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=" + kafkaUser + " password={{secret:" + namespace + "/" + kafkaUser + "/password}}%;");

sqlWith = new SqlWithBuilder()
.withSqlStatement(builder.toString())
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/io/streams/utils/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtensionContext;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Base64;

public class TestUtils {
public static Path getLogPath(String folderName, ExtensionContext context) {
Expand All @@ -31,4 +34,49 @@ public static Path getLogPath(String folderName, String testClassName, String te
}
return path;
}

/**
* Decodes a byte[] from Base64.
*
* @param data String that should be decoded.
*
* @return Plain data in byte[].
*/
public static byte[] decodeBytesFromBase64(String data) {
return Base64.getDecoder().decode(data);
}

/**
* Decodes a byte[] from Base64.
*
* @param data byte[] that should be decoded.
*
* @return Plain data in byte[].
*/
public static byte[] decodeBytesFromBase64(byte[] data) {
return Base64.getDecoder().decode(data);
}

/**
* Decodes a String from Base64.
*
* @param data String that should be decoded.
*
* @return Plain data using US ASCII charset.
*/
public static String decodeFromBase64(String data) {
return decodeFromBase64(data, StandardCharsets.US_ASCII);
}

/**
* Decodes a String from Base64.
*
* @param data String that should be decoded.
* @param charset The charset for the return string
*
* @return Plain data using specified charset.
*/
public static String decodeFromBase64(String data, Charset charset) {
return new String(decodeBytesFromBase64(data), charset);
}
}
54 changes: 50 additions & 4 deletions src/test/java/io/streams/e2e/flink/sql/SqlJobRunnerST.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,20 @@
import io.streams.operands.strimzi.resources.KafkaType;
import io.streams.operands.strimzi.templates.KafkaNodePoolTemplate;
import io.streams.operands.strimzi.templates.KafkaTemplate;
import io.streams.operands.strimzi.templates.KafkaUserTemplate;
import io.streams.operators.manifests.ApicurioRegistryManifestInstaller;
import io.streams.operators.manifests.CertManagerManifestInstaller;
import io.streams.operators.manifests.FlinkManifestInstaller;
import io.streams.operators.manifests.StrimziManifestInstaller;
import io.streams.sql.TestStatements;
import io.streams.utils.StrimziClientUtils;
import io.streams.utils.TestUtils;
import io.streams.utils.kube.JobUtils;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerAuthenticationScramSha512;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.api.kafka.model.user.KafkaUserScramSha512ClientAuthentication;
import org.apache.flink.v1beta1.FlinkDeployment;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -62,6 +68,7 @@ void prepareOperators() throws IOException {
@Tag(SMOKE)
void testFlinkSqlRunnerSimpleFilter() {
String namespace = "flink-filter";
String kafkaUser = "test-user";
// Create namespace
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
new NamespaceBuilder().withNewMetadata().withName(namespace).endMetadata().build());
Expand All @@ -82,11 +89,39 @@ void testFlinkSqlRunnerSimpleFilter() {
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
KafkaTemplate.defaultKafka(namespace, "my-cluster")
.editSpec()
.withCruiseControl(null)
.withKafkaExporter(null)
.editKafka()
.withListeners(
new GenericKafkaListenerBuilder()
.withName("plain")
.withTls(false)
.withType(KafkaListenerType.INTERNAL)
.withPort((9092))
.withAuth(new KafkaListenerAuthenticationScramSha512())
.build(),
new GenericKafkaListenerBuilder()
.withName("tls")
.withTls(true)
.withType(KafkaListenerType.INTERNAL)
.withPort((9093))
.build()
)
.endKafka()
.endSpec()
.build());

// Create kafka scram sha user
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(
KafkaUserTemplate.defaultKafkaUser(namespace, kafkaUser, "my-cluster")
.editSpec()
.withAuthentication(new KafkaUserScramSha512ClientAuthentication())
.endSpec()
.build());

// Get user secret jaas configuration
final String saslJaasConfigEncrypted = KubeResourceManager.getKubeClient().getClient().secrets()
.inNamespace(namespace).withName(kafkaUser).get().getData().get("sasl.jaas.config");
final String saslJaasConfigDecrypted = TestUtils.decodeFromBase64(saslJaasConfigEncrypted);

// Run internal producer and produce data
String bootstrapServer = KafkaType.kafkaClient().inNamespace(namespace).withName("my-cluster").get()
.getStatus().getListeners().get(0).getBootstrapServers();
Expand All @@ -98,11 +133,15 @@ void testFlinkSqlRunnerSimpleFilter() {
.withTopicName("flink.payment.data")
.withBootstrapAddress(bootstrapServer)
.withMessageCount(10000)
.withUsername(kafkaUser)
.withDelayMs(10)
.withMessageTemplate("payment_fiat")
.withAdditionalConfig(
StrimziClientUtils.getApicurioAdditionalProperties(AvroKafkaSerializer.class.getName(),
"http://apicurio-registry-service.flink-filter.svc:8080/apis/registry/v2")
"http://apicurio-registry-service.flink-filter.svc:8080/apis/registry/v2") + "\n"
+ "sasl.mechanism=SCRAM-SHA-512\n"
+ "security.protocol=SASL_PLAINTEXT\n"
+ "sasl.jaas.config=" + saslJaasConfigDecrypted
)
.build();

Expand All @@ -112,8 +151,10 @@ void testFlinkSqlRunnerSimpleFilter() {

String registryUrl = "http://apicurio-registry-service.flink-filter.svc:8080/apis/ccompat/v6";

// Deploy flink with test filter sql statement which filter to specific topic only payment type paypal
FlinkDeployment flink = FlinkDeploymentTemplate.defaultFlinkDeployment(namespace,
"flink-filter", List.of(TestStatements.getTestFlinkFilter(bootstrapServer, registryUrl)))
"flink-filter", List.of(TestStatements.getTestFlinkFilter(
bootstrapServer, registryUrl, kafkaUser, namespace)))
.build();
KubeResourceManager.getInstance().createOrUpdateResourceWithWait(flink);

Expand All @@ -128,6 +169,11 @@ void testFlinkSqlRunnerSimpleFilter() {
.withTopicName("flink.payment.paypal")
.withBootstrapAddress(bootstrapServer)
.withMessageCount(10)
.withAdditionalConfig(
"sasl.mechanism=SCRAM-SHA-512\n" +
"security.protocol=SASL_PLAINTEXT\n" +
"sasl.jaas.config=" + saslJaasConfigDecrypted
)
.withConsumerGroup("flink-filter-test-group").build();

KubeResourceManager.getInstance().createResourceWithWait(
Expand Down