From f9f06c4d4840fe1c99e653e794c3d341d485d1d0 Mon Sep 17 00:00:00 2001 From: Ralf Date: Thu, 21 Sep 2023 20:01:54 +0200 Subject: [PATCH] [FSTORE-1024] Concurrency can cause SQLIntegrityConstraintViolationException when creating topic (#1562) --- .../common/dao/kafka/ProjectTopicsFacade.java | 2 +- .../online/OnlineFeaturegroupController.java | 8 +++++++- .../hopsworks/common/kafka/KafkaController.java | 17 ++++++++--------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/ProjectTopicsFacade.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/ProjectTopicsFacade.java index f0626e5804..39c871c267 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/ProjectTopicsFacade.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/dao/kafka/ProjectTopicsFacade.java @@ -44,7 +44,7 @@ protected EntityManager getEntityManager() { } private static final Logger LOGGER = Logger.getLogger(ProjectTopicsFacade.class.getName()); - + public List findTopicsByProject (Project project) { return em.createNamedQuery("ProjectTopics.findByProject", ProjectTopics.class) .setParameter("project", project) diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java index b25121f3cf..99724eaa6d 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/featurestore/featuregroup/online/OnlineFeaturegroupController.java @@ -168,11 +168,17 @@ public void createFeatureGroupKafkaTopic(Project project, Featuregroup featureGr subjectsCompatibilityController.setSubjectCompatibility(project, featureGroupEntityName, SchemaCompatibility.NONE); String topicName = Utils.getFeatureGroupTopicName(featureGroup); - if (!kafkaController.projectTopicExists(project, topicName)) { + + try { TopicDTO topicDTO = new TopicDTO(topicName, settings.getKafkaDefaultNumReplicas(), settings.getOnlineFsThreadNumber()); kafkaController.createTopic(project, topicDTO); + } catch (KafkaException e) { + // if topic already exists, no need to create it again. + if (e.getErrorCode() != RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS) { + throw e; + } } } diff --git a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java index 359b6813f8..2b1356ae90 100644 --- a/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java +++ b/hopsworks-common/src/main/java/io/hops/hopsworks/common/kafka/KafkaController.java @@ -64,8 +64,10 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartitionInfo; +import javax.ejb.ConcurrencyManagement; +import javax.ejb.ConcurrencyManagementType; import javax.ejb.EJB; -import javax.ejb.Stateless; +import javax.ejb.Singleton; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import java.util.ArrayList; @@ -80,7 +82,8 @@ import java.util.logging.Logger; import java.util.stream.Collectors; -@Stateless +@Singleton +@ConcurrencyManagement(ConcurrencyManagementType.BEAN) @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) public class KafkaController { @@ -102,7 +105,7 @@ public class KafkaController { protected FeaturestoreStorageConnectorController storageConnectorController; - public ProjectTopics createTopic(Project project, TopicDTO topicDto) throws KafkaException { + public synchronized ProjectTopics createTopic(Project project, TopicDTO topicDto) throws KafkaException { if (externalKafka(project)) { return null; } @@ -112,8 +115,8 @@ public ProjectTopics createTopic(Project project, TopicDTO topicDto) throws Kafk } String topicName = topicDto.getName(); - - if (projectTopicsFacade.findTopicByName(topicDto.getName()).isPresent()) { + + if (projectTopicsFacade.findTopicByName(topicName).isPresent()) { throw new KafkaException(RESTCodes.KafkaErrorCode.TOPIC_ALREADY_EXISTS, Level.FINE, "topic name: " + topicName); } @@ -183,10 +186,6 @@ private void deleteTopic(List topics) { //remove from zookeeper hopsKafkaAdminClient.deleteTopics(topicNameList); } - - public boolean projectTopicExists(Project project, String topicName) { - return projectTopicsFacade.findTopicByNameAndProject(project, topicName).isPresent(); - } public List findTopicsByProject(Project project) { List ptList = projectTopicsFacade.findTopicsByProject(project);