Skip to content

Commit

Permalink
Merge branch 'main' into k-diger/kafbat#675
Browse files Browse the repository at this point in the history
  • Loading branch information
K-Diger authored Dec 13, 2024
2 parents cbec569 + d507a9f commit 0b1498b
Show file tree
Hide file tree
Showing 29 changed files with 149 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
Expand All @@ -14,9 +15,11 @@
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Mono.error(new ValidationException("Invalid configuration")))
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
Mono.error(new ValidationException("Invalid configuration")));
.onErrorResume(WebClientResponseException.BadRequest.class,
RetryingKafkaConnectClient::parseConnectErrorMessage)
.onErrorResume(WebClientResponseException.InternalServerError.class,
RetryingKafkaConnectClient::parseConnectErrorMessage);
}

// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
// Adding the connect runtime dependency for this single class seems excessive
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
}

@Override
Expand Down
24 changes: 17 additions & 7 deletions api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.kafbat.ui.api.model.ErrorResponse;
import io.kafbat.ui.model.ConnectorDTO;
import io.kafbat.ui.model.ConnectorPluginConfigDTO;
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
Expand Down Expand Up @@ -166,7 +167,7 @@ public void shouldReturnNotFoundForNonExistingConnectName() {

@Test
public void shouldRetrieveConnector() {
ConnectorDTO expected = (ConnectorDTO) new ConnectorDTO()
ConnectorDTO expected = new ConnectorDTO()
.connect(connectName)
.status(new ConnectorStatusDTO()
.state(ConnectorStateDTO.RUNNING)
Expand Down Expand Up @@ -268,19 +269,28 @@ public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {


@Test
@SuppressWarnings("checkstyle:LineLength")
public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
webTestClient.put()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
LOCAL, connectName, connectorName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "invalid number",
"topics", "another-topic",
"file", "/tmp/test"
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "invalid number",
"topics", "another-topic",
"file", "/tmp/test"
)
)
.exchange()
.expectStatus().isBadRequest();
.expectStatus().isBadRequest()
.expectBody(ErrorResponse.class)
.value(response -> assertThat(response.getMessage()).isEqualTo(
"""
Connector configuration is invalid and contains the following 2 error(s):
Invalid value invalid number for configuration tasks.max: Not a number of type INT
Invalid value null for configuration tasks.max: Value must be non-null
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"""
));

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
Expand Down Expand Up @@ -383,7 +393,7 @@ public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() {
.map(ConnectorPluginConfigDTO::getValue)
.map(ConnectorPluginConfigValueDTO::getErrors)
.filter(not(List::isEmpty))
.findFirst().get();
.findFirst().orElseThrow();
assertEquals(
"Invalid value 0 for configuration tasks.max: Value must be at least 1",
error.get(0)
Expand Down
49 changes: 26 additions & 23 deletions api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.producer.KafkaTestProducer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -55,15 +56,17 @@ public void shouldDeleteRecords() {
throw new RuntimeException(e);
}

long count = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessageEventDTO.class)
.returnResult()
.getResponseBody()
long count = Objects.requireNonNull(
webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessageEventDTO.class)
.returnResult()
.getResponseBody()
)
.stream()
.filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
.count();
Expand All @@ -76,14 +79,16 @@ public void shouldDeleteRecords() {
.expectStatus()
.isOk();

count = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessageEventDTO.class)
.returnResult()
.getResponseBody()
count = Objects.requireNonNull(
webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
.expectBodyList(TopicMessageEventDTO.class)
.returnResult()
.getResponseBody()
)
.stream()
.filter(e -> e.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE))
.count();
Expand Down Expand Up @@ -120,7 +125,7 @@ public void shouldIncreasePartitionsUpTo10() {
.returnResult()
.getResponseBody();

assert response != null;
Assertions.assertNotNull(response);
Assertions.assertEquals(10, response.getTotalPartitionsCount());

TopicDetailsDTO topicDetails = webTestClient.get()
Expand All @@ -134,7 +139,7 @@ public void shouldIncreasePartitionsUpTo10() {
.returnResult()
.getResponseBody();

assert topicDetails != null;
Assertions.assertNotNull(topicDetails);
Assertions.assertEquals(10, topicDetails.getPartitionCount());
}

Expand All @@ -157,8 +162,6 @@ public void shouldReturn404ForNonExistingTopic() {

@Test
public void shouldReturnConfigsForBroker() {
var topicName = UUID.randomUUID().toString();

List<BrokerConfigDTO> configs = webTestClient.get()
.uri("/api/clusters/{clusterName}/brokers/{id}/configs",
LOCAL,
Expand All @@ -171,7 +174,7 @@ public void shouldReturnConfigsForBroker() {
.getResponseBody();

Assertions.assertNotNull(configs);
assert !configs.isEmpty();
Assertions.assertFalse(configs.isEmpty());
Assertions.assertNotNull(configs.get(0).getName());
Assertions.assertNotNull(configs.get(0).getIsReadOnly());
Assertions.assertNotNull(configs.get(0).getIsSensitive());
Expand Down Expand Up @@ -216,7 +219,7 @@ public void shouldRetrieveTopicConfig() {
.getResponseBody();

Assertions.assertNotNull(configs);
assert !configs.isEmpty();
Assertions.assertFalse(configs.isEmpty());
Assertions.assertNotNull(configs.get(0).getName());
Assertions.assertNotNull(configs.get(0).getIsReadOnly());
Assertions.assertNotNull(configs.get(0).getIsSensitive());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.security.auth.callback.Callback;
Expand Down Expand Up @@ -59,9 +58,6 @@ public class AzureEntraLoginCallbackHandlerTest {
@Mock
private OAuthBearerTokenCallback oauthBearerTokenCallBack;

@Mock
private OAuthBearerToken oauthBearerToken;

@Mock
private TokenCredential tokenCredential;

Expand All @@ -77,12 +73,8 @@ public void beforeEach() {
}

@Test
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()
throws UnsupportedCallbackException {
final Map<String, Object> configs = new HashMap<>();
configs.put(
"bootstrap.servers",
List.of("test-eh.servicebus.windows.net:9093"));
public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest() throws UnsupportedCallbackException {
Map<String, Object> configs = Map.of("bootstrap.servers", List.of("test-eh.servicebus.windows.net:9093"));

when(tokenCredential.getToken(any(TokenRequestContext.class))).thenReturn(Mono.just(accessToken));
when(accessToken.getToken()).thenReturn(VALID_SAMPLE_TOKEN);
Expand Down Expand Up @@ -114,10 +106,7 @@ public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest()

@Test
public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException {
final Map<String, Object> configs = new HashMap<>();
configs.put(
"bootstrap.servers",
List.of("test-eh.servicebus.windows.net:9093"));
Map<String, Object> configs = Map.of("bootstrap.servers", List.of("test-eh.servicebus.windows.net:9093"));

when(tokenCredential.getToken(any(TokenRequestContext.class)))
.thenThrow(new RuntimeException("failed to acquire token"));
Expand All @@ -136,16 +125,13 @@ public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallb

@Test
public void shouldThrowExceptionWithNullBootstrapServers() {
final Map<String, Object> configs = new HashMap<>();

assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
configs, null, null));
Map.of(), null, null));
}

@Test
public void shouldThrowExceptionWithMultipleBootstrapServers() {
final Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", List.of("server1", "server2"));
Map<String, Object> configs = Map.of("bootstrap.servers", List.of("server1", "server2"));

assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure(
configs, null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public class AzureEntraOAuthBearerTokenTest {
void constructorShouldParseToken() {
final AccessToken accessToken = new AccessToken(VALID_SAMPLE_TOKEN, OffsetDateTime.MIN);

final AzureEntraOAuthBearerToken azureOAuthBearerToken =
new AzureEntraOAuthBearerToken(accessToken);
final AzureEntraOAuthBearerToken azureOAuthBearerToken = new AzureEntraOAuthBearerToken(accessToken);

assertThat(azureOAuthBearerToken, is(notNullValue()));
assertThat(azureOAuthBearerToken.value(), is(VALID_SAMPLE_TOKEN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public KafkaConnectContainer withKafka(Network network, String bootstrapServers)
}

public String getTarget() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(CONNECT_PORT);
return "http://" + getHost() + ":" + getMappedPort(CONNECT_PORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ private KsqlDbContainer withKafka(Network network, String bootstrapServers) {
}

public String url() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(PORT);
return "http://" + getHost() + ":" + getMappedPort(PORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public SchemaRegistryContainer withKafka(Network network, String bootstrapServer
}

public String getUrl() {
return "http://" + getContainerIpAddress() + ":" + getMappedPort(SCHEMA_PORT);
return "http://" + getHost() + ":" + getMappedPort(SCHEMA_PORT);
}

public SchemaRegistryClient schemaRegistryClient() {
Expand Down
2 changes: 2 additions & 0 deletions api/src/test/java/io/kafbat/ui/emitter/CursorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private Cursor assertCursor(PollingModeDTO expectedMode,

var cursorPosition = registeredCursor.consumerPosition();
assertThat(cursorPosition).isNotNull();
assertThat(cursorPosition.offsets()).isNotNull();
assertThat(cursorPosition.topic()).isEqualTo(TOPIC);
assertThat(cursorPosition.partitions()).isEqualTo(List.of());
assertThat(cursorPosition.pollingMode()).isEqualTo(expectedMode);
Expand All @@ -111,6 +112,7 @@ private void waitMgsgEmitted(AbstractEmitter emitter, int expectedMsgsCnt) {
List<TopicMessageEventDTO> events = Flux.create(emitter)
.collectList()
.block();
assertThat(events).isNotNull();
assertThat(events.stream().filter(m -> m.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE).count())
.isEqualTo(expectedMsgsCnt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private Flux<TopicMessageEventDTO> createTailingFlux(
String query) {
var cluster = applicationContext.getBean(ClustersStorage.class)
.getClusterByName(LOCAL)
.get();
.orElseThrow();

return applicationContext.getBean(MessagesService.class)
.loadMessages(cluster, topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -145,6 +144,8 @@ private void verifyAutoConfigured(SerdeInstance serde) {
}

private void verifyPatternsMatch(ClustersProperties.SerdeConfig config, SerdeInstance serde) {
assertThat(serde.topicKeyPattern).isNotNull();
assertThat(serde.topicValuePattern).isNotNull();
assertThat(serde.topicKeyPattern.pattern()).isEqualTo(config.getTopicKeysPattern());
assertThat(serde.topicValuePattern.pattern()).isEqualTo(config.getTopicValuesPattern());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private boolean isGroupMetadataMessage(DeserializeResult key, DeserializeResult
}

@SneakyThrows
@SuppressWarnings("unchecked")
private Map<String, Object> toMapFromJsom(DeserializeResult result) {
return new JsonMapper().readValue(result.getResult(), Map.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void createConfigureFillsDescriptorMappingsWhenProtoFilesListProvided() throws E
}

@Test
void createConfigureFillsDescriptorMappingsWhenProtoFileDirProvided() throws Exception {
void createConfigureFillsDescriptorMappingsWhenProtoFileDirProvided() {
PropertyResolver resolver = mock(PropertyResolver.class);
when(resolver.getProperty("protobufFilesDir", String.class))
.thenReturn(Optional.of(protoFilesDir()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BrokerServiceTest extends AbstractIntegrationTest {

@Test
void getBrokersReturnsFilledBrokerDto() {
var localCluster = clustersStorage.getClusterByName(LOCAL).get();
var localCluster = clustersStorage.getClusterByName(LOCAL).orElseThrow();
StepVerifier.create(brokerService.getBrokers(localCluster))
.expectNextMatches(b -> b.getId().equals(1)
&& b.getHost().equals(kafka.getHost())
Expand Down
2 changes: 2 additions & 0 deletions api/src/test/java/io/kafbat/ui/service/ConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ private Optional<BrokerConfigDTO> getConfig(String name) {
.returnResult()
.getResponseBody();

assertThat(configs).isNotNull();

return configs.stream()
.filter(c -> c.getName().equals(name))
.findAny();
Expand Down
Loading

0 comments on commit 0b1498b

Please sign in to comment.