Skip to content

Commit

Permalink
[FSTORE-1011] Validation message during data ingestion to FG when the…
Browse files Browse the repository at this point in the history
…re are no kafka topics configured (#1414)
  • Loading branch information
bubriks authored Oct 16, 2023
1 parent bfb3da3 commit d06ddfe
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public synchronized ProjectTopics createTopic(Project project, TopicDTO topicDto

protected void checkReplication(TopicDTO topicDto) throws KafkaException {
List<String> brokerEndpoints = kafkaBrokers.getBrokerEndpoints(KafkaBrokers.BrokerProtocol.INTERNAL);
if (brokerEndpoints.isEmpty()) {
throw new KafkaException(RESTCodes.KafkaErrorCode.BROKER_MISSING, Level.FINE);
}
if (brokerEndpoints.size() < topicDto.getNumOfReplicas()) {
throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_REPLICATION_ERROR, Level.FINE,
"maximum: " + brokerEndpoints.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public void setup() {
@Test
public void testCheckReplication() throws KafkaException {
// Arrange
Mockito.doReturn(false).when(kafkaController).externalKafka(Mockito.any());
List<String> brokers = Collections.nCopies(10, "endpoint");
Mockito.doReturn(brokers).when(kafkaController.kafkaBrokers).getBrokerEndpoints(Mockito.any());
TopicDTO topicDTO = new TopicDTO();
Expand All @@ -64,7 +63,6 @@ public void testCheckReplication() throws KafkaException {
@Test(expected = KafkaException.class)
public void testCheckReplicationTooFewBrokers() throws KafkaException {
// Arrange
Mockito.doReturn(false).when(kafkaController).externalKafka(Mockito.any());
List<String> brokers = Collections.nCopies(1, "endpoint");
Mockito.doReturn(brokers).when(kafkaController.kafkaBrokers).getBrokerEndpoints(Mockito.any());
TopicDTO topicDTO = new TopicDTO();
Expand All @@ -76,6 +74,19 @@ public void testCheckReplicationTooFewBrokers() throws KafkaException {
// Assert
}

@Test(expected = KafkaException.class)
public void testCheckReplicationNoBrokers() throws KafkaException {
// Arrange
Mockito.doReturn(new ArrayList<>()).when(kafkaController.kafkaBrokers).getBrokerEndpoints(Mockito.any());
TopicDTO topicDTO = new TopicDTO();
topicDTO.setNumOfReplicas(10);

// Act
kafkaController.checkReplication(topicDTO);

// Assert
}

@Test
public void testRemoveKafkaTopicsInternalKafka() {
// Arrange
Expand Down Expand Up @@ -142,6 +153,8 @@ public void testRemoveTopicFromProjectExternalKafka() throws KafkaException {
@Test(expected = NullPointerException.class)
public void testCreateTopicInternalKafka() throws KafkaException {
// Arrange
List<String> brokers = Collections.nCopies(10, "endpoint");
Mockito.doReturn(brokers).when(kafkaController.kafkaBrokers).getBrokerEndpoints(Mockito.any());
Mockito.doReturn(false).when(kafkaController).externalKafka(Mockito.any());

// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,8 @@ public enum KafkaErrorCode implements RESTErrorCode {
Response.Status.SERVICE_UNAVAILABLE),
TOPIC_DELETION_FAILED(22, "Could not delete Kafka topics.", Response.Status.INTERNAL_SERVER_ERROR),
TOPIC_FETCH_FAILED(23, "Could not fetch topic details.", Response.Status.INTERNAL_SERVER_ERROR),
TOPIC_CREATION_FAILED(24, "Could not create topic.", Response.Status.INTERNAL_SERVER_ERROR);
TOPIC_CREATION_FAILED(24, "Could not create topic.", Response.Status.INTERNAL_SERVER_ERROR),
BROKER_MISSING(25, "Could not find a broker endpoint.", Response.Status.NOT_FOUND);


private Integer code;
Expand Down

0 comments on commit d06ddfe

Please sign in to comment.