Skip to content

Commit

Permalink
Improve connect-cluster vault error handling & messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasCAI-mlv committed Dec 11, 2024
1 parent 0d0c2b9 commit ca76ea4
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 378 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.michelin.ns4kafka.controller.connect;

import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterDeleteOperation;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotAllowed;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidOwner;
import static com.michelin.ns4kafka.util.enumation.Kind.CONNECT_CLUSTER;
import static io.micronaut.core.util.StringUtils.EMPTY_STRING;
Expand Down Expand Up @@ -246,7 +245,8 @@ public HttpResponse<List<ConnectCluster>> bulkDelete(String namespace,
public List<ConnectCluster> listVaults(final String namespace) {
return connectClusterService.findAllForNamespaceWithWritePermission(getNamespace(namespace))
.stream()
.filter(connectCluster -> StringUtils.hasText(connectCluster.getSpec().getAes256Key()))
.filter(connectCluster -> StringUtils.hasText(connectCluster.getSpec().getAes256Key())
&& StringUtils.hasText(connectCluster.getSpec().getAes256Salt()))
.toList();
}

Expand All @@ -264,12 +264,7 @@ public List<VaultResponse> vaultPassword(final String namespace,
@Body final List<String> passwords) {
final Namespace ns = getNamespace(namespace);

final var validationErrors = new ArrayList<String>();
if (!connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectCluster)) {
validationErrors.add(invalidConnectClusterNotAllowed(connectCluster));
}

validationErrors.addAll(connectClusterService.validateConnectClusterVault(ns, connectCluster));
List<String> validationErrors = connectClusterService.validateConnectClusterVault(ns, connectCluster);

if (!validationErrors.isEmpty()) {
throw new ResourceValidationException(CONNECT_CLUSTER, connectCluster, validationErrors);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterEncryptionConfig;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterMalformedUrl;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNameAlreadyExistGlobally;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNoEncryptionConfig;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidConnectClusterNotHealthy;
import static com.michelin.ns4kafka.util.FormatErrorUtils.invalidNotFound;

Expand All @@ -17,7 +18,6 @@
import com.michelin.ns4kafka.service.client.connect.KafkaConnectClient;
import com.michelin.ns4kafka.service.client.connect.entities.ServerInfo;
import com.michelin.ns4kafka.util.EncryptionUtils;
import com.michelin.ns4kafka.util.FormatErrorUtils;
import com.michelin.ns4kafka.util.RegexUtils;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
Expand All @@ -32,7 +32,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -190,7 +189,8 @@ public Optional<ConnectCluster> findByNameWithOwnerPermission(Namespace namespac
public List<ConnectCluster> findAllForNamespaceWithWritePermission(Namespace namespace) {
return Stream.concat(
findByWildcardNameWithOwnerPermission(namespace, "*").stream(),
findAllForNamespaceByPermissions(namespace, List.of(AccessControlEntry.Permission.WRITE)).stream()
findAllForNamespaceByPermissions(namespace, List.of(AccessControlEntry.Permission.WRITE))
.stream()
.map(connectCluster -> ConnectCluster.builder()
.metadata(connectCluster.getMetadata())
.spec(ConnectCluster.ConnectClusterSpec.builder()
Expand Down Expand Up @@ -281,41 +281,26 @@ public Mono<List<String>> validateConnectClusterCreation(ConnectCluster connectC
}

/**
* Validate the given connect worker has configuration for vaults.
* Validate the given connect worker has configuration for vault.
*
* @param connectCluster The Kafka connect worker to validate
* @return A list of validation errors
*/
public List<String> validateConnectClusterVault(final Namespace namespace, final String connectCluster) {
final var errors = new ArrayList<String>();
Optional<ConnectCluster> kafkaConnect = findAllForNamespaceWithWritePermission(namespace)
.stream()
.filter(cc -> cc.getMetadata().getName().equals(connectCluster))
.findFirst();

final List<ConnectCluster> kafkaConnects = findAllForNamespaceByPermissions(namespace,
List.of(AccessControlEntry.Permission.OWNER, AccessControlEntry.Permission.WRITE));

if (kafkaConnects.isEmpty()) {
if (kafkaConnect.isEmpty()) {
errors.add(invalidNotFound(connectCluster));
return errors;
}

if (kafkaConnects.stream().noneMatch(cc -> StringUtils.hasText(cc.getSpec().getAes256Key())
&& StringUtils.hasText(cc.getSpec().getAes256Salt()))) {
errors.add(invalidConnectClusterEncryptionConfig());
return errors;
}

final Optional<ConnectCluster> kafkaConnect = kafkaConnects.stream()
.filter(cc -> cc.getMetadata().getName().equals(connectCluster)
&& StringUtils.hasText(cc.getSpec().getAes256Key())
&& StringUtils.hasText(cc.getSpec().getAes256Salt()))
.findFirst();

if (kafkaConnect.isEmpty()) {
final String allowedConnectClusters = kafkaConnects.stream()
.filter(cc -> StringUtils.hasText(cc.getSpec().getAes256Key())
&& StringUtils.hasText(cc.getSpec().getAes256Salt()))
.map(cc -> cc.getMetadata().getName())
.collect(Collectors.joining(", "));
errors.add(FormatErrorUtils.invalidConnectClusterMustBeOneOf(connectCluster, allowedConnectClusters));
if (!StringUtils.hasText(kafkaConnect.get().getSpec().getAes256Key())
|| !StringUtils.hasText(kafkaConnect.get().getSpec().getAes256Salt())) {
errors.add(invalidConnectClusterNoEncryptionConfig());
return errors;
}

Expand Down Expand Up @@ -343,19 +328,6 @@ public boolean isNamespaceOwnerOfConnectCluster(Namespace namespace, String conn
AccessControlEntry.ResourceType.CONNECT_CLUSTER, connectCluster);
}

/**
* Is given namespace allowed (Owner or Writer) for the given connect worker.
*
* @param namespace The namespace
* @param connectCluster The Kafka connect cluster
* @return true if it is, false otherwise
*/
public boolean isNamespaceAllowedForConnectCluster(Namespace namespace, String connectCluster) {
return findAllForNamespaceWithWritePermission(namespace)
.stream()
.anyMatch(kafkaConnect -> kafkaConnect.getMetadata().getName().equals(connectCluster));
}

/**
* Vault a password for a specific namespace and a kafka connect cluster.
*
Expand Down
21 changes: 4 additions & 17 deletions src/main/java/com/michelin/ns4kafka/util/FormatErrorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,26 +181,13 @@ public static String invalidConnectClusterNameAlreadyExistGlobally(String invali
}

/**
* Invalid connect cluster namespace not allowed.
* Invalid connect cluster missing encryption config.
*
* @param invalidNameValue the invalid field value
* @return the error message
*/
public static String invalidConnectClusterNotAllowed(String invalidNameValue) {
return String.format(INVALID_FIELD, invalidNameValue, "connect-cluster",
"namespace is not allowed to use this Kafka Connect");
}

/**
* Invalid connect cluster must be one of.
*
* @param invalidNameValue the invalid field value
* @param allowedConnectClusters the allowed connect clusters
* @return the error message
*/
public static String invalidConnectClusterMustBeOneOf(String invalidNameValue,
String allowedConnectClusters) {
return invalidValueMustBeOneOf(FIELD_NAME, invalidNameValue, allowedConnectClusters);
public static String invalidConnectClusterNoEncryptionConfig() {
return String.format(INVALID_EMPTY_FIELDS, String.join(", ", "aes256Key", "aes256Salt"),
"AES key and salt are required to use connect-cluster vault");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ void shouldUpdateConnectClusterWhenChanged() {
}

@Test
void shouldCreateConnectClusterInDryRunMode() {
void shouldNotCreateConnectClusterInDryRunMode() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -666,7 +666,28 @@ void shouldCreateConnectClusterInDryRunMode() {
}

@Test
void shouldListVaultConnectClusterReturnEmptyWhenNoAes256Config() {
void shouldListNoVaultConnectClusterWhenNoConnectCluster() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.build())
.spec(Namespace.NamespaceSpec.builder()
.topicValidator(TopicValidator.makeDefault())
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.findAllForNamespaceWithWritePermission(ns))
.thenReturn(List.of());

List<ConnectCluster> actual = connectClusterController.listVaults("test");
assertTrue(actual.isEmpty());
}

@Test
void shouldListNoVaultConnectClusterWhenNoAes256Config() {
Namespace ns = Namespace.builder()
.metadata(Metadata.builder()
.name("test")
Expand Down Expand Up @@ -706,59 +727,48 @@ void shouldListVaultConnectClusterWhenAes256Config() {
.build())
.build();

ConnectCluster connectCluster = ConnectCluster.builder()
ConnectCluster ccNoAes = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster")
.build())
.spec(ConnectCluster.ConnectClusterSpec.builder()
.build())
.build();

ConnectCluster connectClusterAes256 = ConnectCluster.builder()
ConnectCluster ccAes256Key = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster-aes256")
.name("connect-cluster")
.build())
.spec(ConnectCluster.ConnectClusterSpec.builder()
.aes256Key("myKeyEncryption")
.aes256Salt("p8t42EhY9z2eSUdpGeq7HX7RboMrsJAhUnu3EEJJVS")
.build())
.build();

when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(connectClusterService.findAllForNamespaceWithWritePermission(ns)).thenReturn(
List.of(connectCluster, connectClusterAes256));

List<ConnectCluster> actual = connectClusterController.listVaults("test");
assertEquals(1, actual.size());
}
ConnectCluster ccAes256Salt = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("connect-cluster")
.build())
.spec(ConnectCluster.ConnectClusterSpec.builder()
.aes256Salt("p8t42EhY9z2eSUdpGeq7HX7RboMrsJAhUnu3EEJJVS")
.build())
.build();

@Test
void shouldNotVaultOnNotAllowedConnectCluster() {
String connectClusterName = "connect-cluster-na";
Namespace ns = Namespace.builder()
ConnectCluster ccAes256KeySalt = ConnectCluster.builder()
.metadata(Metadata.builder()
.name("test")
.cluster("local")
.name("connect-cluster-aes256")
.build())
.spec(Namespace.NamespaceSpec.builder()
.topicValidator(TopicValidator.makeDefault())
.spec(ConnectCluster.ConnectClusterSpec.builder()
.aes256Key("myKeyEncryption")
.aes256Salt("p8t42EhY9z2eSUdpGeq7HX7RboMrsJAhUnu3EEJJVS")
.build())
.build();

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectClusterName))
.thenReturn(false);
when(connectClusterService.validateConnectClusterVault(ns, connectClusterName))
.thenReturn(List.of());
when(namespaceService.findByName("test")).thenReturn(Optional.of(ns));
when(connectClusterService.findAllForNamespaceWithWritePermission(ns)).thenReturn(
List.of(ccNoAes, ccAes256Key, ccAes256Salt, ccAes256KeySalt));

var secrets = List.of("secret");
ResourceValidationException result = assertThrows(ResourceValidationException.class,
() -> connectClusterController.vaultPassword("test", connectClusterName, secrets));
assertEquals(1, result.getValidationErrors().size());
assertEquals("Invalid value \"connect-cluster-na\" for field \"connect-cluster\": "
+ "namespace is not allowed to use this Kafka Connect.",
result.getValidationErrors().getFirst());
List<ConnectCluster> actual = connectClusterController.listVaults("test");
assertEquals(List.of(ccAes256KeySalt), actual);
}

@Test
Expand All @@ -776,8 +786,6 @@ void shouldNotVaultOnConnectClusterWithInvalidAes256Config() {

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectClusterName))
.thenReturn(true);
when(connectClusterService.validateConnectClusterVault(ns, connectClusterName))
.thenReturn(List.of("Error config."));

Expand All @@ -786,6 +794,7 @@ void shouldNotVaultOnConnectClusterWithInvalidAes256Config() {
() -> connectClusterController.vaultPassword("test", connectClusterName, secrets));
assertEquals(1, result.getValidationErrors().size());
assertEquals("Error config.", result.getValidationErrors().getFirst());
verify(connectClusterService, never()).vaultPassword(any(), any(), any());
}

@Test
Expand All @@ -803,23 +812,10 @@ void shouldVaultOnConnectClusterWithValidAes256Config() {

when(namespaceService.findByName("test"))
.thenReturn(Optional.of(ns));
when(connectClusterService.isNamespaceAllowedForConnectCluster(ns, connectClusterName))
.thenReturn(true);
when(connectClusterService.validateConnectClusterVault(ns, connectClusterName))
.thenReturn(List.of());
when(connectClusterService.vaultPassword(ns, connectClusterName, List.of("secret")))
.thenReturn(List.of(VaultResponse.builder()
.spec(VaultResponse.VaultResponseSpec.builder()
.clearText("secret")
.encrypted("encryptedSecret")
.build())
.build()
));

final List<VaultResponse> actual = connectClusterController
.vaultPassword("test", connectClusterName, List.of("secret"));

assertEquals("secret", actual.getFirst().getSpec().getClearText());
assertEquals("encryptedSecret", actual.getFirst().getSpec().getEncrypted());

connectClusterController.vaultPassword("test", connectClusterName, List.of("secret"));
verify(connectClusterService).vaultPassword(ns, connectClusterName, List.of("secret"));
}
}
Loading

0 comments on commit ca76ea4

Please sign in to comment.