diff --git a/kura/org.eclipse.kura.cloudconnection.kapua.mqtt.provider/src/main/java/org/eclipse/kura/core/cloud/CloudServiceImpl.java b/kura/org.eclipse.kura.cloudconnection.kapua.mqtt.provider/src/main/java/org/eclipse/kura/core/cloud/CloudServiceImpl.java index 9944f1efe1..9065d5ecf4 100644 --- a/kura/org.eclipse.kura.cloudconnection.kapua.mqtt.provider/src/main/java/org/eclipse/kura/core/cloud/CloudServiceImpl.java +++ b/kura/org.eclipse.kura.cloudconnection.kapua.mqtt.provider/src/main/java/org/eclipse/kura/core/cloud/CloudServiceImpl.java @@ -50,6 +50,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -174,9 +175,8 @@ public class CloudServiceImpl private String ownPid; private ScheduledFuture scheduledBirthPublisherFuture; - private final ScheduledExecutorService scheduledBirthPublisher = Executors.newScheduledThreadPool(1); - private LifecycleMessage lastBirthMessage; - private LifecycleMessage lastAppMessage; + private final ScheduledExecutorService scheduledBirthPublisher = Executors.newSingleThreadScheduledExecutor(); + private final AtomicBoolean shouldPublishDelayedBirth = new AtomicBoolean(); public CloudServiceImpl() { this.cloudClients = new CopyOnWriteArrayList<>(); @@ -825,12 +825,11 @@ private void publishBirthCertificate(boolean isNewConnection) throws KuraExcepti } readModemProfile(); - LifecycleMessage birthToPublish = new LifecycleMessage(this.options, this).asBirthCertificateMessage(); if (isNewConnection) { - publishLifeCycleMessage(birthToPublish); + publishLifeCycleMessage(new LifecycleMessage(this.options, this).asBirthCertificateMessage()); } else { - publishWithDelay(birthToPublish); + publishWithDelay(false); } } @@ -843,42 +842,26 @@ private void publishAppCertificate() { logger.info("framework is stopping.. not republishing app certificate"); return; } - publishWithDelay(new LifecycleMessage(this.options, this).asAppCertificateMessage()); + publishWithDelay(true); } - private void publishWithDelay(LifecycleMessage message) { - if (Objects.nonNull(this.scheduledBirthPublisherFuture)) { - this.scheduledBirthPublisherFuture.cancel(false); - logger.debug("CloudServiceImpl: BIRTH message cache timer restarted."); - } + private void publishWithDelay(boolean isAppUpdate) { + synchronized (this.shouldPublishDelayedBirth) { + if (!isAppUpdate) { + this.shouldPublishDelayedBirth.set(true); + } - logger.debug("CloudServiceImpl: BIRTH message cached for 30s."); + if (Objects.nonNull(this.scheduledBirthPublisherFuture)) { + this.scheduledBirthPublisherFuture.cancel(false); + logger.debug("CloudServiceImpl: BIRTH message cache timer restarted."); + } + logger.info("isAppUpdate? {}", isAppUpdate, new RuntimeException()); - if (message.isBirthCertificateMessage()) { - this.lastBirthMessage = message; - } + logger.debug("CloudServiceImpl: BIRTH message cached for 30s."); - if (message.isAppCertificateMessage()) { - this.lastAppMessage = message; + this.scheduledBirthPublisherFuture = this.scheduledBirthPublisher.schedule(this::publishDelayedMessage, 30L, + TimeUnit.SECONDS); } - - this.scheduledBirthPublisherFuture = this.scheduledBirthPublisher.schedule(() -> { - try { - - if (Objects.nonNull(this.lastBirthMessage)) { - logger.debug("CloudServiceImpl: publishing cached BIRTH message."); - publishLifeCycleMessage(this.lastBirthMessage); - } - - if (Objects.nonNull(this.lastAppMessage)) { - logger.debug("CloudServiceImpl: publishing cached APP message."); - publishLifeCycleMessage(this.lastAppMessage); - } - - } catch (KuraException e) { - logger.error("Error sending cached BIRTH/APP certificate.", e); - } - }, 30L, TimeUnit.SECONDS); } private void publishLifeCycleMessage(LifecycleMessage message) throws KuraException { @@ -902,6 +885,23 @@ private void publishLifeCycleMessage(LifecycleMessage message) throws KuraExcept } } + private void publishDelayedMessage() { + synchronized (this.shouldPublishDelayedBirth) { + try { + + if (this.shouldPublishDelayedBirth.get()) { + publishLifeCycleMessage(new LifecycleMessage(this.options, this).asBirthCertificateMessage()); + } else { + publishLifeCycleMessage(new LifecycleMessage(this.options, this).asAppCertificateMessage()); + } + this.shouldPublishDelayedBirth.set(false); + + } catch (KuraException e) { + logger.error("Error sending cached BIRTH/APP certificate.", e); + } + } + } + private byte[] encodeProtobufPayload(KuraPayload payload) throws KuraException { byte[] bytes = new byte[0]; if (payload == null) {