diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java index 709b7bd18..2ffb2fdfc 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManager.java @@ -58,6 +58,11 @@ public class PubSubClientEventManager extends RedisClusterPubSubAdapter pubSubConnection; @@ -240,10 +245,29 @@ public CompletionStage handleNewMessageAvailable(final UUID accountIden } return pubSubConnection.withPubSubConnection(connection -> - connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) + connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), NEW_MESSAGE_EVENT_BYTES)) .thenApply(listeners -> listeners > 0); } + /** + * Publishes an event notifying a specific device that messages have been persisted from short-term to long-term + * storage. + * + * @param accountIdentifier the account identifier for which messages have been persisted + * @param deviceId the ID of the device within the target account + * + * @return a future that completes when the event has been published + */ + public CompletionStage handleMessagesPersisted(final UUID accountIdentifier, final byte deviceId) { + if (pubSubConnection == null) { + throw new IllegalStateException("Presence manager not started"); + } + + return pubSubConnection.withPubSubConnection(connection -> + connection.async().spublish(getClientPresenceKey(accountIdentifier, deviceId), MESSAGES_PERSISTED_EVENT_BYTES)) + .thenRun(Util.NOOP); + } + /** * Tests whether a client with the given account/device is connected to this presence manager instance. * diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java index ec86f95d3..f61865aa7 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/storage/MessagePersister.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.util.Util; import software.amazon.awssdk.services.dynamodb.model.ItemCollectionSizeLimitExceededException; @@ -30,6 +31,7 @@ public class MessagePersister implements Managed { private final MessagesCache messagesCache; private final MessagesManager messagesManager; private final AccountsManager accountsManager; + private final PubSubClientEventManager pubSubClientEventManager; private final Duration persistDelay; @@ -63,13 +65,16 @@ public class MessagePersister implements Managed { public MessagePersister(final MessagesCache messagesCache, final MessagesManager messagesManager, final AccountsManager accountsManager, - final DynamicConfigurationManager dynamicConfigurationManager, final Duration persistDelay, + final PubSubClientEventManager pubSubClientEventManager, + final DynamicConfigurationManager dynamicConfigurationManager, + final Duration persistDelay, final int dedicatedProcessWorkerThreadCount ) { this.messagesCache = messagesCache; this.messagesManager = messagesManager; this.accountsManager = accountsManager; + this.pubSubClientEventManager = pubSubClientEventManager; this.persistDelay = persistDelay; this.workerThreads = new Thread[dedicatedProcessWorkerThreadCount]; @@ -206,6 +211,7 @@ void persistQueue(final Account account, final Device device) throws MessagePers maybeUnlink(account, deviceId); // may throw, in which case we'll retry later by the usual mechanism } finally { messagesCache.unlockQueueForPersistence(accountUuid, deviceId); + pubSubClientEventManager.handleMessagesPersisted(accountUuid, deviceId); sample.stop(persistQueueTimer); } diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java index 1a6295019..c63f111ac 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/CommandDependencies.java @@ -76,6 +76,7 @@ record CommandDependencies( ReportMessageManager reportMessageManager, MessagesCache messagesCache, MessagesManager messagesManager, + PubSubClientEventManager pubSubClientEventManager, KeysManager keysManager, APNSender apnSender, FcmSender fcmSender, @@ -271,6 +272,7 @@ static CommandDependencies build( reportMessageManager, messagesCache, messagesManager, + pubSubClientEventManager, keys, apnSender, fcmSender, diff --git a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java index 25a10b825..b261465d0 100644 --- a/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java +++ b/service/src/main/java/org/whispersystems/textsecuregcm/workers/MessagePersisterServiceCommand.java @@ -60,8 +60,11 @@ protected void run(Environment environment, Namespace namespace, WhisperServerCo }); } - final MessagePersister messagePersister = new MessagePersister(deps.messagesCache(), deps.messagesManager(), - deps.accountsManager(), deps.dynamicConfigurationManager(), + final MessagePersister messagePersister = new MessagePersister(deps.messagesCache(), + deps.messagesManager(), + deps.accountsManager(), + deps.pubSubClientEventManager(), + deps.dynamicConfigurationManager(), Duration.ofMinutes(configuration.getMessageCacheConfiguration().getPersistDelayMinutes()), namespace.getInt(WORKER_COUNT)); diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java index 98d368b48..7a6244ddc 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/push/PubSubClientEventManagerTest.java @@ -20,8 +20,10 @@ import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; @@ -97,8 +99,8 @@ void handleClientConnected(final boolean displaceRemotely) throws InterruptedExc final byte deviceId = Device.PRIMARY_ID; final AtomicBoolean firstListenerDisplaced = new AtomicBoolean(false); - final AtomicBoolean secondListenerDisplaced = new AtomicBoolean(false); + final AtomicBoolean secondListenerDisplaced = new AtomicBoolean(false); final AtomicBoolean firstListenerConnectedElsewhere = new AtomicBoolean(false); localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { @@ -144,15 +146,12 @@ void handleNewMessageAvailable(final boolean messageAvailableRemotely) throws In final UUID accountIdentifier = UUID.randomUUID(); final byte deviceId = Device.PRIMARY_ID; - final AtomicBoolean messageReceived = new AtomicBoolean(false); + final CountDownLatch messageReceivedLatch = new CountDownLatch(1); localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { @Override public void handleNewMessageAvailable() { - synchronized (messageReceived) { - messageReceived.set(true); - messageReceived.notifyAll(); - } + messageReceivedLatch.countDown(); } }).toCompletableFuture().join(); @@ -161,13 +160,32 @@ public void handleNewMessageAvailable() { assertTrue(messagePresenceManager.handleNewMessageAvailable(accountIdentifier, deviceId).toCompletableFuture().join()); - synchronized (messageReceived) { - while (!messageReceived.get()) { - messageReceived.wait(); + assertTrue(messageReceivedLatch.await(2, TimeUnit.SECONDS), + "Message not received within time limit"); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void handleMessagesPersisted(final boolean messagesPersistedRemotely) throws InterruptedException { + final UUID accountIdentifier = UUID.randomUUID(); + final byte deviceId = Device.PRIMARY_ID; + + final CountDownLatch messagesPersistedLatch = new CountDownLatch(1); + + localPresenceManager.handleClientConnected(accountIdentifier, deviceId, new ClientEventAdapter() { + @Override + public void handleMessagesPersistedPubSub() { + messagesPersistedLatch.countDown(); } - } + }).toCompletableFuture().join(); + + final PubSubClientEventManager persistingPresenceManager = + messagesPersistedRemotely ? remotePresenceManager : localPresenceManager; + + persistingPresenceManager.handleMessagesPersisted(accountIdentifier, deviceId).toCompletableFuture().join(); - assertTrue(messageReceived.get()); + assertTrue(messagesPersistedLatch.await(2, TimeUnit.SECONDS), + "Message persistence event not received within time limit"); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java index 1c11421f5..b7bac9f71 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterIntegrationTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.protobuf.ByteString; @@ -32,6 +33,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.storage.DynamoDbExtensionSchema.Tables; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; @@ -53,6 +55,7 @@ class MessagePersisterIntegrationTest { private ExecutorService messageDeletionExecutorService; private MessagesCache messagesCache; private MessagesManager messagesManager; + private PubSubClientEventManager pubSubClientEventManager; private MessagePersister messagePersister; private Account account; @@ -82,8 +85,10 @@ void setUp() throws Exception { messageDeliveryScheduler, messageDeletionExecutorService, Clock.systemUTC(), dynamicConfigurationManager); messagesManager = new MessagesManager(messagesDynamoDb, messagesCache, mock(ReportMessageManager.class), messageDeletionExecutorService); + pubSubClientEventManager = mock(PubSubClientEventManager.class); + messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, - dynamicConfigurationManager, PERSIST_DELAY, 1); + pubSubClientEventManager, dynamicConfigurationManager, PERSIST_DELAY, 1); account = mock(Account.class); @@ -178,6 +183,8 @@ public boolean handleMessagesPersisted() { .toList(); assertEquals(expectedMessages, persistedMessages); + + verify(pubSubClientEventManager).handleMessagesPersisted(account.getUuid(), Device.PRIMARY_ID); }); } diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java index 2aaf52531..d032bc11e 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/storage/MessagePersisterTest.java @@ -46,6 +46,7 @@ import org.mockito.stubbing.Answer; import org.whispersystems.textsecuregcm.configuration.dynamic.DynamicConfiguration; import org.whispersystems.textsecuregcm.entities.MessageProtos; +import org.whispersystems.textsecuregcm.push.PubSubClientEventManager; import org.whispersystems.textsecuregcm.redis.RedisClusterExtension; import org.whispersystems.textsecuregcm.tests.util.DevicesHelper; import reactor.core.scheduler.Scheduler; @@ -66,6 +67,7 @@ class MessagePersisterTest { private MessagePersister messagePersister; private AccountsManager accountsManager; private MessagesManager messagesManager; + private PubSubClientEventManager pubSubClientEventManager; private Account destinationAccount; private static final UUID DESTINATION_ACCOUNT_UUID = UUID.randomUUID(); @@ -100,7 +102,8 @@ void setUp() throws Exception { messageDeliveryScheduler = Schedulers.newBoundedElastic(10, 10_000, "messageDelivery"); messagesCache = new MessagesCache(REDIS_CLUSTER_EXTENSION.getRedisCluster(), sharedExecutorService, messageDeliveryScheduler, sharedExecutorService, Clock.systemUTC(), dynamicConfigurationManager); - messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, + pubSubClientEventManager = mock(PubSubClientEventManager.class); + messagePersister = new MessagePersister(messagesCache, messagesManager, accountsManager, pubSubClientEventManager, dynamicConfigurationManager, PERSIST_DELAY, 1); when(messagesManager.clear(any(UUID.class), anyByte())).thenReturn(CompletableFuture.completedFuture(null)); @@ -154,6 +157,8 @@ void testPersistNextQueuesSingleQueue() { verify(messagesDynamoDb, atLeastOnce()).store(messagesCaptor.capture(), eq(DESTINATION_ACCOUNT_UUID), eq(DESTINATION_DEVICE)); assertEquals(messageCount, messagesCaptor.getAllValues().stream().mapToInt(List::size).sum()); + + verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID); } @Test @@ -223,6 +228,8 @@ void testPersistQueueRetry() { assertEquals(List.of(queueName), messagesCache.getQueuesToPersist(SlotHash.getSlot(queueName), Instant.now().plus(messagePersister.getPersistDelay()), 1)); + + verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID); } @Test @@ -241,6 +248,8 @@ void testPersistQueueRetryLoop() { assertTimeoutPreemptively(Duration.ofSeconds(1), () -> assertThrows(MessagePersistenceException.class, () -> messagePersister.persistQueue(destinationAccount, DESTINATION_DEVICE))); + + verify(pubSubClientEventManager).handleMessagesPersisted(DESTINATION_ACCOUNT_UUID, DESTINATION_DEVICE_ID); } @Test diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java index 505783083..65f32351d 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/FinishPushNotificationExperimentCommandTest.java @@ -73,6 +73,7 @@ void setUp() { null, null, null, + null, pushNotificationExperimentSamples, null, null, diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java index 86a03200e..d0c4afc39 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/NotifyIdleDevicesCommandTest.java @@ -66,6 +66,7 @@ private TestNotifyIdleDevicesCommand(final MessagesManager messagesManager, null, null, null, + null, null); this.idleDeviceNotificationScheduler = idleDeviceNotificationScheduler; diff --git a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java index b547e3273..3b54fc52b 100644 --- a/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java +++ b/service/src/test/java/org/whispersystems/textsecuregcm/workers/StartPushNotificationExperimentCommandTest.java @@ -62,6 +62,7 @@ public TestStartPushNotificationExperimentCommand( null, null, null, + null, pushNotificationExperimentSamples, null, null,