Skip to content

Commit

Permalink
[FSTORE-1024] Concurrency can cause SQLIntegrityConstraintViolationEx…
Browse files Browse the repository at this point in the history
…ception when creating topic (#1562)
  • Loading branch information
bubriks authored and SirOibaf committed Sep 21, 2023
1 parent a3c803a commit f9f06c4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected EntityManager getEntityManager() {
}

private static final Logger LOGGER = Logger.getLogger(ProjectTopicsFacade.class.getName());

public List<ProjectTopics> findTopicsByProject (Project project) {
return em.createNamedQuery("ProjectTopics.findByProject", ProjectTopics.class)
.setParameter("project", project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,17 @@ public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGr
subjectsCompatibilityController.setSubjectCompatibility(project, featureGroupEntityName, SchemaCompatibility.NONE);

String topicName = Utils.getFeatureGroupTopicName(featureGroup);
if (!kafkaController.projectTopicExists(project, topicName)) {

try {
TopicDTO topicDTO = new TopicDTO(topicName,
settings.getKafkaDefaultNumReplicas(),
settings.getOnlineFsThreadNumber());
kafkaController.createTopic(project, topicDTO);
} catch (KafkaException e) {
// if topic already exists, no need to create it again.
if (e.getErrorCode() != RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS) {
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;

import javax.ejb.ConcurrencyManagement;
import javax.ejb.ConcurrencyManagementType;
import javax.ejb.EJB;
import javax.ejb.Stateless;
import javax.ejb.Singleton;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import java.util.ArrayList;
Expand All @@ -80,7 +82,8 @@
import java.util.logging.Logger;
import java.util.stream.Collectors;

@Stateless
@Singleton
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class KafkaController {

Expand All @@ -102,7 +105,7 @@ public class KafkaController {
protected FeaturestoreStorageConnectorController storageConnectorController;


public ProjectTopics createTopic(Project project, TopicDTO topicDto) throws KafkaException {
public synchronized ProjectTopics createTopic(Project project, TopicDTO topicDto) throws KafkaException {
if (externalKafka(project)) {
return null;
}
Expand All @@ -112,8 +115,8 @@ public ProjectTopics createTopic(Project project, TopicDTO topicDto) throws Kafk
}

String topicName = topicDto.getName();
if (projectTopicsFacade.findTopicByName(topicDto.getName()).isPresent()) {

if (projectTopicsFacade.findTopicByName(topicName).isPresent()) {
throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + topicName);
}

Expand Down Expand Up @@ -183,10 +186,6 @@ private void deleteTopic(List<ProjectTopics> topics) {
//remove from zookeeper
hopsKafkaAdminClient.deleteTopics(topicNameList);
}

public boolean projectTopicExists(Project project, String topicName) {
return projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent();
}

public List<TopicDTO> findTopicsByProject(Project project) {
List<ProjectTopics> ptList = projectTopicsFacade.findTopicsByProject(project);
Expand Down

0 comments on commit f9f06c4

Please sign in to comment.