Skip to content

Commit

Permalink
fix: Cleanup of birth and app publish logic (#5546)
Browse files Browse the repository at this point in the history
Signed-off-by: MMaiero <[email protected]>
  • Loading branch information
MMaiero committed Dec 19, 2024
1 parent 2d73950 commit 04699d9
Showing 1 changed file with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -174,9 +175,8 @@ public class CloudServiceImpl
private String ownPid;

private ScheduledFuture<?> scheduledBirthPublisherFuture;
private final ScheduledExecutorService scheduledBirthPublisher = Executors.newScheduledThreadPool(1);
private LifecycleMessage lastBirthMessage;
private LifecycleMessage lastAppMessage;
private final ScheduledExecutorService scheduledBirthPublisher = Executors.newSingleThreadScheduledExecutor();
private final AtomicBoolean shouldPublishDelayedBirth = new AtomicBoolean();

public CloudServiceImpl() {
this.cloudClients = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -825,12 +825,11 @@ private void publishBirthCertificate(boolean isNewConnection) throws KuraExcepti
}

readModemProfile();
LifecycleMessage birthToPublish = new LifecycleMessage(this.options, this).asBirthCertificateMessage();

if (isNewConnection) {
publishLifeCycleMessage(birthToPublish);
publishLifeCycleMessage(new LifecycleMessage(this.options, this).asBirthCertificateMessage());
} else {
publishWithDelay(birthToPublish);
publishWithDelay(false);
}
}

Expand All @@ -843,42 +842,26 @@ private void publishAppCertificate() {
logger.info("framework is stopping.. not republishing app certificate");
return;
}
publishWithDelay(new LifecycleMessage(this.options, this).asAppCertificateMessage());
publishWithDelay(true);
}

private void publishWithDelay(LifecycleMessage message) {
if (Objects.nonNull(this.scheduledBirthPublisherFuture)) {
this.scheduledBirthPublisherFuture.cancel(false);
logger.debug("CloudServiceImpl: BIRTH message cache timer restarted.");
}
private void publishWithDelay(boolean isAppUpdate) {
synchronized (this.shouldPublishDelayedBirth) {
if (!isAppUpdate) {
this.shouldPublishDelayedBirth.set(true);
}

logger.debug("CloudServiceImpl: BIRTH message cached for 30s.");
if (Objects.nonNull(this.scheduledBirthPublisherFuture)) {
this.scheduledBirthPublisherFuture.cancel(false);
logger.debug("CloudServiceImpl: BIRTH message cache timer restarted.");
}
logger.info("isAppUpdate? {}", isAppUpdate, new RuntimeException());

if (message.isBirthCertificateMessage()) {
this.lastBirthMessage = message;
}
logger.debug("CloudServiceImpl: BIRTH message cached for 30s.");

if (message.isAppCertificateMessage()) {
this.lastAppMessage = message;
this.scheduledBirthPublisherFuture = this.scheduledBirthPublisher.schedule(this::publishDelayedMessage, 30L,
TimeUnit.SECONDS);
}

this.scheduledBirthPublisherFuture = this.scheduledBirthPublisher.schedule(() -> {
try {

if (Objects.nonNull(this.lastBirthMessage)) {
logger.debug("CloudServiceImpl: publishing cached BIRTH message.");
publishLifeCycleMessage(this.lastBirthMessage);
}

if (Objects.nonNull(this.lastAppMessage)) {
logger.debug("CloudServiceImpl: publishing cached APP message.");
publishLifeCycleMessage(this.lastAppMessage);
}

} catch (KuraException e) {
logger.error("Error sending cached BIRTH/APP certificate.", e);
}
}, 30L, TimeUnit.SECONDS);
}

private void publishLifeCycleMessage(LifecycleMessage message) throws KuraException {
Expand All @@ -902,6 +885,23 @@ private void publishLifeCycleMessage(LifecycleMessage message) throws KuraExcept
}
}

private void publishDelayedMessage() {
synchronized (this.shouldPublishDelayedBirth) {
try {

if (this.shouldPublishDelayedBirth.get()) {
publishLifeCycleMessage(new LifecycleMessage(this.options, this).asBirthCertificateMessage());
} else {
publishLifeCycleMessage(new LifecycleMessage(this.options, this).asAppCertificateMessage());
}
this.shouldPublishDelayedBirth.set(false);

} catch (KuraException e) {
logger.error("Error sending cached BIRTH/APP certificate.", e);
}
}
}

private byte[] encodeProtobufPayload(KuraPayload payload) throws KuraException {
byte[] bytes = new byte[0];
if (payload == null) {
Expand Down

0 comments on commit 04699d9

Please sign in to comment.