From 6d166fdfc53f002a547921bd217eceeae6ef2312 Mon Sep 17 00:00:00 2001 From: Jon Chambers Date: Mon, 8 Jul 2024 12:23:49 -0400 Subject: [PATCH] Return futures from "send push notification" operations --- .../push/PushNotificationManager.java | 130 +++++++++--------- 1 file changed, 68 insertions(+), 62 deletions(-) diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java index e5b61fe84..7f539267f 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PushNotificationManager.java @@ -12,6 +12,7 @@ import io.micrometer.core.instrument.Tags; import java.time.Instant; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -49,35 +50,38 @@ public PushNotificationManager(final AccountsManager accountsManager, this.pushLatencyManager = pushLatencyManager; } - public void sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException { + public CompletableFuture> sendNewMessageNotification(final Account destination, final byte destinationDeviceId, final boolean urgent) throws NotPushRegisteredException { final Device device = destination.getDevice(destinationDeviceId).orElseThrow(NotPushRegisteredException::new); final Pair tokenAndType = getToken(device); - sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(), + return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(), PushNotification.NotificationType.NOTIFICATION, null, destination, device, urgent)); } - public void sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) { - sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true)); + public CompletableFuture sendRegistrationChallengeNotification(final String deviceToken, final PushNotification.TokenType tokenType, final String challengeToken) { + return sendNotification(new PushNotification(deviceToken, tokenType, PushNotification.NotificationType.CHALLENGE, challengeToken, null, null, true)) + .thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications"))); } - public void sendRateLimitChallengeNotification(final Account destination, final String challengeToken) + public CompletableFuture sendRateLimitChallengeNotification(final Account destination, final String challengeToken) throws NotPushRegisteredException { final Device device = destination.getPrimaryDevice(); final Pair tokenAndType = getToken(device); - sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(), - PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true)); + return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(), + PushNotification.NotificationType.RATE_LIMIT_CHALLENGE, challengeToken, destination, device, true)) + .thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications"))); } - public void sendAttemptLoginNotification(final Account destination, final String context) throws NotPushRegisteredException { + public CompletableFuture sendAttemptLoginNotification(final Account destination, final String context) throws NotPushRegisteredException { final Device device = destination.getDevice(Device.PRIMARY_ID).orElseThrow(NotPushRegisteredException::new); final Pair tokenAndType = getToken(device); - sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(), + return sendNotification(new PushNotification(tokenAndType.first(), tokenAndType.second(), PushNotification.NotificationType.ATTEMPT_LOGIN_NOTIFICATION_HIGH_PRIORITY, - context, destination, device, true)); + context, destination, device, true)) + .thenApply(maybeResponse -> maybeResponse.orElseThrow(() -> new AssertionError("Responses must be present for urgent notifications"))); } public void handleMessagesRetrieved(final Account account, final Device device, final String userAgent) { @@ -103,64 +107,66 @@ Pair getToken(final Device device) throws No } @VisibleForTesting - void sendNotification(final PushNotification pushNotification) { + CompletableFuture> sendNotification(final PushNotification pushNotification) { if (pushNotification.tokenType() == PushNotification.TokenType.APN && !pushNotification.urgent()) { // APNs imposes a per-device limit on background push notifications; schedule a notification for some time in the // future (possibly even now!) rather than sending a notification directly - apnPushNotificationScheduler + return apnPushNotificationScheduler .scheduleBackgroundNotification(pushNotification.destination(), pushNotification.destinationDevice()) - .whenComplete(logErrors()); + .whenComplete(logErrors()) + .thenApply(ignored -> Optional.empty()) + .toCompletableFuture(); + } - } else { - final PushNotificationSender sender = switch (pushNotification.tokenType()) { - case FCM -> fcmSender; - case APN, APN_VOIP -> apnSender; - }; - - sender.sendNotification(pushNotification).whenComplete((result, throwable) -> { - if (throwable == null) { - Tags tags = Tags.of("tokenType", pushNotification.tokenType().name(), - "notificationType", pushNotification.notificationType().name(), - "urgent", String.valueOf(pushNotification.urgent()), - "accepted", String.valueOf(result.accepted()), - "unregistered", String.valueOf(result.unregistered())); - - if (result.errorCode().isPresent()) { - tags = tags.and("errorCode", result.errorCode().get()); - } - - Metrics.counter(SENT_NOTIFICATION_COUNTER_NAME, tags).increment(); - - if (result.unregistered() && pushNotification.destination() != null - && pushNotification.destinationDevice() != null) { - - handleDeviceUnregistered(pushNotification.destination(), - pushNotification.destinationDevice(), - pushNotification.tokenType(), - result.errorCode(), - result.unregisteredTimestamp()); - } - - if (result.accepted() && - pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP && - pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION && - pushNotification.destination() != null && - pushNotification.destinationDevice() != null) { - - apnPushNotificationScheduler.scheduleRecurringVoipNotification( - pushNotification.destination(), - pushNotification.destinationDevice()) - .whenComplete(logErrors()); - } - } else { - logger.debug("Failed to deliver {} push notification to {} ({})", - pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(), - throwable); - - Metrics.counter(FAILED_NOTIFICATION_COUNTER_NAME, "cause", throwable.getClass().getSimpleName()).increment(); + final PushNotificationSender sender = switch (pushNotification.tokenType()) { + case FCM -> fcmSender; + case APN, APN_VOIP -> apnSender; + }; + + return sender.sendNotification(pushNotification).whenComplete((result, throwable) -> { + if (throwable == null) { + Tags tags = Tags.of("tokenType", pushNotification.tokenType().name(), + "notificationType", pushNotification.notificationType().name(), + "urgent", String.valueOf(pushNotification.urgent()), + "accepted", String.valueOf(result.accepted()), + "unregistered", String.valueOf(result.unregistered())); + + if (result.errorCode().isPresent()) { + tags = tags.and("errorCode", result.errorCode().get()); } - }); - } + + Metrics.counter(SENT_NOTIFICATION_COUNTER_NAME, tags).increment(); + + if (result.unregistered() && pushNotification.destination() != null + && pushNotification.destinationDevice() != null) { + + handleDeviceUnregistered(pushNotification.destination(), + pushNotification.destinationDevice(), + pushNotification.tokenType(), + result.errorCode(), + result.unregisteredTimestamp()); + } + + if (result.accepted() && + pushNotification.tokenType() == PushNotification.TokenType.APN_VOIP && + pushNotification.notificationType() == PushNotification.NotificationType.NOTIFICATION && + pushNotification.destination() != null && + pushNotification.destinationDevice() != null) { + + apnPushNotificationScheduler.scheduleRecurringVoipNotification( + pushNotification.destination(), + pushNotification.destinationDevice()) + .whenComplete(logErrors()); + } + } else { + logger.debug("Failed to deliver {} push notification to {} ({})", + pushNotification.notificationType(), pushNotification.deviceToken(), pushNotification.tokenType(), + throwable); + + Metrics.counter(FAILED_NOTIFICATION_COUNTER_NAME, "cause", throwable.getClass().getSimpleName()).increment(); + } + }) + .thenApply(Optional::of); } private static BiConsumer logErrors() {