Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1644] FlinkEngine configures Kafka producer pointing to the l… #438

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions java/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
<properties>
<flink.version>1.17.1.0</flink.version>
<fasterxml.version>2.13.4.2</fasterxml.version>
<bouncycastle.version>1.79</bouncycastle.version>
<guava.version>14.0.1</guava.version>
<httpclient.version>4.5.6</httpclient.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -88,5 +91,24 @@
<version>${fasterxml.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
<scope>test</scope>
<version>${bouncycastle.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.logicalclocks.hsfs.flink.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.logicalclocks.hsfs.FeatureGroupBase;
import com.logicalclocks.hsfs.FeatureStoreException;
Expand All @@ -25,6 +26,8 @@
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;

import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.twitter.chill.Base64;
import lombok.Getter;

import org.apache.avro.generic.GenericRecord;
Expand All @@ -36,16 +39,24 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.FileUtils;
import org.apache.kafka.common.config.SslConfigs;

import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class FlinkEngine extends EngineBase {
private static FlinkEngine INSTANCE = null;

public static synchronized FlinkEngine getInstance() throws FeatureStoreException {
public static synchronized FlinkEngine getInstance() {
if (INSTANCE == null) {
INSTANCE = new FlinkEngine();
}
Expand All @@ -55,38 +66,38 @@ public static synchronized FlinkEngine getInstance() throws FeatureStoreExceptio
@Getter
private StreamExecutionEnvironment streamExecutionEnvironment;

private FlinkEngine() throws FeatureStoreException {
private FlinkEngine() {
streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure the streamExecutionEnvironment
streamExecutionEnvironment.getConfig().enableObjectReuse();
}

public DataStreamSink<?> writeDataStream(StreamFeatureGroup streamFeatureGroup, DataStream<?> dataStream,
Map<String, String> writeOptions) throws FeatureStoreException, IOException {
Map<String, String> writeOptions) throws FeatureStoreException, IOException {

DataStream<Object> genericDataStream = (DataStream<Object>) dataStream;
Properties properties = new Properties();
properties.putAll(getKafkaConfig(streamFeatureGroup, writeOptions));

KafkaSink<GenericRecord> sink = KafkaSink.<GenericRecord>builder()
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(properties)
.setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(properties)
.setRecordSerializer(new KafkaRecordSerializer(streamFeatureGroup))
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
Map<String, String> complexFeatureSchemas = new HashMap<>();
for (String featureName: streamFeatureGroup.getComplexFeatures()) {
for (String featureName : streamFeatureGroup.getComplexFeatures()) {
complexFeatureSchemas.put(featureName, streamFeatureGroup.getFeatureAvroSchema(featureName));
}

DataStream<GenericRecord> avroRecordDataStream =
genericDataStream.map(new PojoToAvroRecord(
streamFeatureGroup.getDeserializedAvroSchema(),
streamFeatureGroup.getDeserializedEncodedAvroSchema(),
complexFeatureSchemas))
.returns(
new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema())
);
genericDataStream.map(new PojoToAvroRecord(
streamFeatureGroup.getDeserializedAvroSchema(),
streamFeatureGroup.getDeserializedEncodedAvroSchema(),
complexFeatureSchemas))
.returns(
new GenericRecordAvroTypeInfo(streamFeatureGroup.getDeserializedEncodedAvroSchema())
);

return avroRecordDataStream.sinkTo(sink);
}
Expand All @@ -96,34 +107,98 @@ public String addFile(String filePath) throws IOException {
if (Strings.isNullOrEmpty(filePath)) {
return filePath;
}
// this is used for unit testing
if (!filePath.startsWith("file://")) {
filePath = "hdfs://" + filePath;

if (filePath.startsWith("hdfs://")) {
String targetPath = FileUtils.getCurrentWorkingDirectory().toString()
+ filePath.substring(filePath.lastIndexOf("/"));
FileUtils.copy(new Path(filePath), new Path(targetPath), false);

return targetPath;
}
String targetPath = FileUtils.getCurrentWorkingDirectory().toString()
+ filePath.substring(filePath.lastIndexOf("/"));
FileUtils.copy(new Path(filePath), new Path(targetPath), false);
return targetPath;

return filePath;
}

@Override
public Map<String, String> getKafkaConfig(FeatureGroupBase featureGroup, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {
throws FeatureStoreException, IOException {
boolean external = !(System.getProperties().containsKey(HopsworksInternalClient.REST_ENDPOINT_SYS)
|| (writeOptions != null
&& Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false"))));
|| (writeOptions != null
&& Boolean.parseBoolean(writeOptions.getOrDefault("internal_kafka", "false"))));

StorageConnector.KafkaConnector storageConnector =
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnectorApi.getKafkaStorageConnector(featureGroup.getFeatureStore(), external);
storageConnector.setSslTruststoreLocation(addFile(storageConnector.getSslTruststoreLocation()));
storageConnector.setSslKeystoreLocation(addFile(storageConnector.getSslKeystoreLocation()));

Map<String, String> config = storageConnector.kafkaOptions();

// To avoid distribution issues of the certificates across multiple pods/nodes
// here we are extracting the key/certificates from the JKS keyStore/trustStore and
// pass them in the configuration as PEM content
try {
KeyStore keyStore = KeyStore.getInstance("JKS");
keyStore.load(new FileInputStream(storageConnector.getSslKeystoreLocation()),
storageConnector.getSslKeystorePassword().toCharArray());
config.put(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, getKey(keyStore, storageConnector.getSslKeystorePassword()));
config.put(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, getCertificateChain(keyStore));
config.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "PEM");

KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(storageConnector.getSslTruststoreLocation()),
storageConnector.getSslTruststorePassword().toCharArray());
config.put(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, getRootCA(trustStore));
config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PEM");
} catch (Exception ex) {
throw new IOException(ex);
}

// Remove the keystore and truststore location from the properties otherwise
// the SSL engine will try to use them first.
config.remove(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
config.remove(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
config.remove(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
config.remove(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
config.remove(SslConfigs.SSL_KEY_PASSWORD_CONFIG);

if (writeOptions != null) {
config.putAll(writeOptions);
}
config.put("enable.idempotence", "false");
return config;
}

private String getKey(KeyStore keyStore, String password)
throws KeyStoreException, UnrecoverableKeyException, NoSuchAlgorithmException {
String keyAlias = keyStore.aliases().nextElement();
return "-----BEGIN PRIVATE KEY-----\n"
+ Base64.encodeBytes(keyStore.getKey(keyAlias, password.toCharArray()).getEncoded())
+ "\n-----END PRIVATE KEY-----";
}

private String getCertificateChain(KeyStore keyStore) throws KeyStoreException, CertificateEncodingException {
String certificateAlias = keyStore.aliases().nextElement();
Certificate[] certificateChain = keyStore.getCertificateChain(certificateAlias);

StringBuilder certificateChainBuilder = new StringBuilder();
for (Certificate certificate : certificateChain) {
certificateChainBuilder.append("-----BEGIN CERTIFICATE-----\n")
.append(Base64.encodeBytes(certificate.getEncoded()))
.append("\n-----END CERTIFICATE-----\n");
}

return certificateChainBuilder.toString();
}

private String getRootCA(KeyStore trustStore) throws KeyStoreException, CertificateEncodingException {
String rootCaAlias = trustStore.aliases().nextElement();
return "-----BEGIN CERTIFICATE-----\n"
+ Base64.encodeBytes(trustStore.getCertificate(rootCaAlias).getEncoded())
+ "\n-----END CERTIFICATE-----";
}

@VisibleForTesting
public void setStorageConnectorApi(StorageConnectorApi storageConnectorApi) {
this.storageConnectorApi = storageConnectorApi;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2024. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/
package com.logicalclocks.hsfs.flink.engine;

import com.logicalclocks.hsfs.*;
import com.logicalclocks.hsfs.flink.FeatureStore;
import com.logicalclocks.hsfs.flink.StreamFeatureGroup;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.HopsworksHttpClient;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import org.apache.kafka.common.config.SslConfigs;
import org.bouncycastle.cert.X509CertificateHolder;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.bouncycastle.openssl.PEMParser;

import java.io.IOException;
import java.io.StringReader;
import java.security.cert.CertificateException;
import java.util.HashMap;
import java.util.Map;

public class TestFlinkEngine {

@Test
public void testKafkaProperties_Certificates() throws IOException, FeatureStoreException, CertificateException {
// Arrange
HopsworksClient hopsworksClient = Mockito.mock(HopsworksClient.class);
hopsworksClient.setInstance(new HopsworksClient(Mockito.mock(HopsworksHttpClient.class), "host"));

StorageConnector.KafkaConnector kafkaConnector = new StorageConnector.KafkaConnector();
kafkaConnector.setSslKeystoreLocation(this.getClass().getResource("/test_kstore.jks").getPath());
kafkaConnector.setSslKeystorePassword("O74K016I5UTB7YYPC6K6RXIM9F7LVPFW23FNK8WF3JEOO7Y607VCU7E7691UQ3CA");
kafkaConnector.setSslTruststoreLocation(this.getClass().getResource("/test_tstore.jks").getPath());
kafkaConnector.setSslTruststorePassword("O74K016I5UTB7YYPC6K6RXIM9F7LVPFW23FNK8WF3JEOO7Y607VCU7E7691UQ3CA");
kafkaConnector.setSecurityProtocol(SecurityProtocol.SSL);
kafkaConnector.setSslEndpointIdentificationAlgorithm(SslEndpointIdentificationAlgorithm.EMPTY);
kafkaConnector.setExternalKafka(true);

StorageConnectorApi storageConnectorApi = Mockito.mock(StorageConnectorApi.class);
Mockito.when(storageConnectorApi.getKafkaStorageConnector(Mockito.any(), Mockito.anyBoolean()))
.thenReturn(kafkaConnector);
FlinkEngine flinkEngine = FlinkEngine.getInstance();
flinkEngine.setStorageConnectorApi(storageConnectorApi);

StreamFeatureGroup featureGroup = new StreamFeatureGroup();
featureGroup.setFeatureStore(new FeatureStore());

// Act
Map<String, String> kafkaOptions = flinkEngine.getKafkaConfig(featureGroup, new HashMap<>());

// Assert
Assert.assertEquals("PEM", kafkaOptions.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
Assert.assertEquals("PEM", kafkaOptions.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));

String keystoreChainPem = kafkaOptions.get(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG);
String trustStorePem = kafkaOptions.get(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);

try (PEMParser pemParser = new PEMParser(new StringReader(keystoreChainPem))) {
Assert.assertEquals("CN=FraudWorkshop__fabio000",
((X509CertificateHolder) pemParser.readObject()).getSubject().toString());

Assert.assertEquals("C=SE,O=Hopsworks,OU=core,CN=HopsRootCA",
((X509CertificateHolder) pemParser.readObject()).getIssuer().toString());
}

try (PEMParser pemParser = new PEMParser(new StringReader(trustStorePem))) {
Assert.assertEquals("C=SE,O=Hopsworks,OU=core,CN=HopsRootCA",
((X509CertificateHolder) pemParser.readObject()).getSubject().toString());
}
}
}
Binary file added java/flink/src/test/resources/test_kstore.jks
Binary file not shown.
Binary file added java/flink/src/test/resources/test_tstore.jks
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.logicalclocks.hsfs.metadata;

import com.logicalclocks.hsfs.FeatureStoreException;
import jdk.nashorn.internal.runtime.regexp.joni.exception.InternalException;
import lombok.Getter;
import lombok.Setter;
import org.apache.http.HttpHeaders;
Expand Down
Loading