diff --git a/migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql b/migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql new file mode 100644 index 0000000000..2c0a13b485 --- /dev/null +++ b/migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS NotDeliveredMessages( + messageHash BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + meta BLOB, + version INTEGER NOT NULL + ); \ No newline at end of file diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 6995a9a8ea..a5eeec7574 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -463,6 +463,15 @@ type WakuNodeConf* = object name: "lightpushnode" .}: string + ## Reliability config + reliabilityEnabled* {. + desc: + """Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests. +with the drawback of consuming some more bandwitdh.""", + defaultValue: false, + name: "reliability" + .}: bool + ## REST HTTP config rest* {. desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest" diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index c77ea52373..28a6326b98 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -20,6 +20,7 @@ import ../waku_node, ../node/peer_manager, ../node/health_monitor, + ../node/delivery_monitor/delivery_monitor, ../waku_api/message_cache, ../waku_api/rest/server, ../waku_archive, @@ -51,6 +52,8 @@ type Waku* = object node*: WakuNode + deliveryMonitor: DeliveryMonitor + restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef @@ -147,13 +150,29 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = error "Failed setting up node", error = nodeRes.error return err("Failed setting up node: " & nodeRes.error) + let node = nodeRes.get() + + var deliveryMonitor: DeliveryMonitor + if conf.reliabilityEnabled: + if conf.storenode == "": + return err("A storenode should be set when reliability mode is on") + + let deliveryMonitorRes = DeliveryMonitor.new( + node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient, + node.wakuFilterClient, + ) + if deliveryMonitorRes.isErr(): + return err("could not create delivery monitor: " & $deliveryMonitorRes.error) + deliveryMonitor = deliveryMonitorRes.get() + var waku = Waku( version: git_version, conf: confCopy, rng: rng, key: confCopy.nodekey.get(), - node: nodeRes.get(), + node: node, dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(), + deliveryMonitor: deliveryMonitor, ) ok(waku) @@ -237,6 +256,10 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: (await waku.wakuDiscV5.start()).isOkOr: return err("failed to start waku discovery v5: " & $error) + ## Reliability + if not waku[].deliveryMonitor.isNil(): + waku[].deliveryMonitor.startDeliveryMonitor() + return ok() # Waku shutdown diff --git a/waku/node/delivery_monitor/delivery_callback.nim b/waku/node/delivery_monitor/delivery_callback.nim new file mode 100644 index 0000000000..c996bc7b0a --- /dev/null +++ b/waku/node/delivery_monitor/delivery_callback.nim @@ -0,0 +1,17 @@ +import ../../waku_core + +type DeliveryDirection* {.pure.} = enum + PUBLISHING + RECEIVING + +type DeliverySuccess* {.pure.} = enum + SUCCESSFUL + UNSUCCESSFUL + +type DeliveryFeedbackCallback* = proc( + success: DeliverySuccess, + dir: DeliveryDirection, + comment: string, + msgHash: WakuMessageHash, + msg: WakuMessage, +) {.gcsafe, raises: [].} diff --git a/waku/node/delivery_monitor/delivery_monitor.nim b/waku/node/delivery_monitor/delivery_monitor.nim new file mode 100644 index 0000000000..28f9e2507a --- /dev/null +++ b/waku/node/delivery_monitor/delivery_monitor.nim @@ -0,0 +1,43 @@ +## This module helps to ensure the correct transmission and reception of messages + +import results +import chronos +import + ./recv_monitor, + ./send_monitor, + ./delivery_callback, + ../../waku_core, + ../../waku_store/client, + ../../waku_relay/protocol, + ../../waku_lightpush/client, + ../../waku_filter_v2/client + +type DeliveryMonitor* = ref object + sendMonitor: SendMonitor + recvMonitor: RecvMonitor + +proc new*( + T: type DeliveryMonitor, + storeClient: WakuStoreClient, + wakuRelay: protocol.WakuRelay, + wakuLightpushClient: WakuLightPushClient, + wakuFilterClient: WakuFilterClient, +): Result[T, string] = + ## storeClient is needed to give store visitility to DeliveryMonitor + ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish + let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient) + let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient) + return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor)) + +proc startDeliveryMonitor*(self: DeliveryMonitor) = + self.sendMonitor.startSendMonitor() + self.recvMonitor.startRecvMonitor() + +proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} = + self.sendMonitor.stopSendMonitor() + await self.recvMonitor.stopRecvMonitor() + +proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) = + ## The deliveryCb is a proc defined by the api client so that it can get delivery feedback + self.sendMonitor.setDeliveryCallback(deliveryCb) + self.recvMonitor.setDeliveryCallback(deliveryCb) diff --git a/waku/node/delivery_monitor/not_delivered_storage/migrations.nim b/waku/node/delivery_monitor/not_delivered_storage/migrations.nim new file mode 100644 index 0000000000..66fb8587c7 --- /dev/null +++ b/waku/node/delivery_monitor/not_delivered_storage/migrations.nim @@ -0,0 +1,26 @@ +{.push raises: [].} + +import std/[tables, strutils, os], results, chronicles +import ../../../common/databases/db_sqlite, ../../../common/databases/common + +logScope: + topics = "waku node delivery_monitor" + +const TargetSchemaVersion* = 1 + # increase this when there is an update in the database schema + +template projectRoot(): string = + currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." + +const PeerStoreMigrationPath: string = projectRoot / "migrations" / "sent_msgs" + +proc migrate*(db: SqliteDatabase): DatabaseResult[void] = + debug "starting peer store's sqlite database migration for sent messages" + + let migrationRes = + migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath) + if migrationRes.isErr(): + return err("failed to execute migration scripts: " & migrationRes.error) + + debug "finished peer store's sqlite database migration for sent messages" + ok() diff --git a/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim b/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim new file mode 100644 index 0000000000..85611310bb --- /dev/null +++ b/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim @@ -0,0 +1,38 @@ +## This module is aimed to keep track of the sent/published messages that are considered +## not being properly delivered. +## +## The archiving of such messages will happen in a local sqlite database. +## +## In the very first approach, we consider that a message is sent properly is it has been +## received by any store node. +## + +import results +import + ../../../common/databases/db_sqlite, + ../../../waku_core/message/message, + ../../../node/delivery_monitor/not_delivered_storage/migrations + +const NotDeliveredMessagesDbUrl = "not-delivered-messages.db" + +type NotDeliveredStorage* = ref object + database: SqliteDatabase + +type TrackedWakuMessage = object + msg: WakuMessage + numTrials: uint + ## for statistics purposes. Counts the number of times the node has tried to publish it + +proc new*(T: type NotDeliveredStorage): Result[T, string] = + let db = ?SqliteDatabase.new(NotDeliveredMessagesDbUrl) + + ?migrate(db) + + return ok(NotDeliveredStorage(database: db)) + +proc archiveMessage*( + self: NotDeliveredStorage, msg: WakuMessage +): Result[void, string] = + ## Archives a waku message so that we can keep track of it + ## even when the app restarts + return ok() diff --git a/waku/node/delivery_monitor/publish_observer.nim b/waku/node/delivery_monitor/publish_observer.nim new file mode 100644 index 0000000000..1f517f8bde --- /dev/null +++ b/waku/node/delivery_monitor/publish_observer.nim @@ -0,0 +1,9 @@ +import chronicles +import ../../waku_core/message/message + +type PublishObserver* = ref object of RootObj + +method onMessagePublished*( + self: PublishObserver, pubsubTopic: string, message: WakuMessage +) {.base, gcsafe, raises: [].} = + error "onMessagePublished not implemented" diff --git a/waku/node/delivery_monitor/recv_monitor.nim b/waku/node/delivery_monitor/recv_monitor.nim new file mode 100644 index 0000000000..3f82ddcd2e --- /dev/null +++ b/waku/node/delivery_monitor/recv_monitor.nim @@ -0,0 +1,196 @@ +## This module is in charge of taking care of the messages that this node is expecting to +## receive and is backed by store-v3 requests to get an additional degree of certainty +## + +import std/[tables, sequtils, sets, options] +import chronos, chronicles, libp2p/utility +import + ../../waku_core, + ./delivery_callback, + ./subscriptions_observer, + ../../waku_store/[client, common], + ../../waku_filter_v2/client, + ../../waku_core/topics + +const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries + +const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages + +const PruneOldMsgsPeriod = chronos.minutes(1) + +const DelayExtra* = chronos.seconds(5) + ## Additional security time to overlap the missing messages queries + +type TupleHashAndMsg = tuple[hash: WakuMessageHash, msg: WakuMessage] + +type RecvMessage = object + msgHash: WakuMessageHash + rxTime: Timestamp + ## timestamp of the rx message. We will not keep the rx messages forever + +type RecvMonitor* = ref object of SubscriptionObserver + topicsInterest: Table[PubsubTopic, seq[ContentTopic]] + ## Tracks message verification requests and when was the last time a + ## pubsub topic was verified for missing messages + ## The key contains pubsub-topics + + storeClient: WakuStoreClient + deliveryCb: DeliveryFeedbackCallback + + recentReceivedMsgs: seq[RecvMessage] + + msgCheckerHandler: Future[void] ## allows to stop the msgChecker async task + msgPrunerHandler: Future[void] ## removes too old messages + + startTimeToCheck: Timestamp + endTimeToCheck: Timestamp + +proc getMissingMsgsFromStore( + self: RecvMonitor, msgHashes: seq[WakuMessageHash] +): Future[Result[seq[TupleHashAndMsg], string]] {.async.} = + let storeResp: StoreQueryResponse = ( + await self.storeClient.queryToAny( + StoreQueryRequest(includeData: true, messageHashes: msgHashes) + ) + ).valueOr: + return err("getMissingMsgsFromStore: " & $error) + + let otherwiseMsg = WakuMessage() + ## message to be returned if the Option message is none + return ok( + storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otherwiseMsg))) + ) + +proc performDeliveryFeedback( + self: RecvMonitor, + success: DeliverySuccess, + dir: DeliveryDirection, + comment: string, + msgHash: WakuMessageHash, + msg: WakuMessage, +) {.gcsafe, raises: [].} = + ## This procs allows to bring delivery feedback to the API client + ## It requires a 'deliveryCb' to be registered beforehand. + if self.deliveryCb.isNil(): + error "deliveryCb is nil in performDeliveryFeedback", + success, dir, comment, msg_hash + return + + debug "recv monitor performDeliveryFeedback", + success, dir, comment, msg_hash = shortLog(msgHash) + self.deliveryCb(success, dir, comment, msgHash, msg) + +proc msgChecker(self: RecvMonitor) {.async.} = + ## Continuously checks if a message has been received + while true: + await sleepAsync(StoreCheckPeriod) + + self.endTimeToCheck = getNowInNanosecondTime() + + var msgHashesInStore = newSeq[WakuMessageHash](0) + for pubsubTopic, cTopics in self.topicsInterest.pairs: + let storeResp: StoreQueryResponse = ( + await self.storeClient.queryToAny( + StoreQueryRequest( + includeData: false, + pubsubTopic: some(PubsubTopic(pubsubTopic)), + contentTopics: cTopics, + startTime: some(self.startTimeToCheck - DelayExtra.nanos), + endTime: some(self.endTimeToCheck + DelayExtra.nanos), + ) + ) + ).valueOr: + error "msgChecker failed to get remote msgHashes", + pubsubTopic, cTopics, error = $error + continue + + msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) + + ## compare the msgHashes seen from the store vs the ones received directly + let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash) + let missedHashes: seq[WakuMessageHash] = + msgHashesInStore.filterIt(not rxMsgHashes.contains(it)) + + ## Now retrieve the missed WakuMessages + let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) + if missingMsgsRet.isOk(): + ## Give feedback so that the api client can perfom any action with the missed messages + for msgTuple in missingMsgsRet.get(): + self.performDeliveryFeedback( + DeliverySuccess.UNSUCCESSFUL, RECEIVING, "Missed message", msgTuple.hash, + msgTuple.msg, + ) + else: + error "failed to retrieve missing messages: ", error = $missingMsgsRet.error + + ## update next check times + self.startTimeToCheck = self.endTimeToCheck + +method onSubscribe( + self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string] +) {.gcsafe, raises: [].} = + debug "onSubscribe", pubsubTopic, contentTopics + self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): + contentTopicsOfInterest[].add(contentTopics) + do: + self.topicsInterest[pubsubTopic] = contentTopics + +method onUnsubscribe( + self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string] +) {.gcsafe, raises: [].} = + debug "onUnsubscribe", pubsubTopic, contentTopics + + self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): + let remainingCTopics = + contentTopicsOfInterest[].filterIt(not contentTopics.contains(it)) + contentTopicsOfInterest[] = remainingCTopics + + if remainingCTopics.len == 0: + self.topicsInterest.del(pubsubTopic) + do: + error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics + +proc new*( + T: type RecvMonitor, + storeClient: WakuStoreClient, + wakuFilterClient: WakuFilterClient, +): T = + ## The storeClient will help to acquire any possible missed messages + + let now = getNowInNanosecondTime() + var recvMonitor = RecvMonitor(storeClient: storeClient, startTimeToCheck: now) + + if not wakuFilterClient.isNil(): + wakuFilterClient.addSubscrObserver(recvMonitor) + + let filterPushHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ) {.async, closure.} = + ## Captures all the messages recived through filter + + let msgHash = computeMessageHash(pubSubTopic, message) + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + recvMonitor.recentReceivedMsgs.add(rxMsg) + + wakuFilterClient.registerPushHandler(filterPushHandler) + + return recvMonitor + +proc loopPruneOldMessages(self: RecvMonitor) {.async.} = + while true: + let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos + self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime) + await sleepAsync(PruneOldMsgsPeriod) + +proc startRecvMonitor*(self: RecvMonitor) = + self.msgCheckerHandler = self.msgChecker() + self.msgPrunerHandler = self.loopPruneOldMessages() + +proc stopRecvMonitor*(self: RecvMonitor) {.async.} = + if not self.msgCheckerHandler.isNil(): + await self.msgCheckerHandler.cancelAndWait() + if not self.msgPrunerHandler.isNil(): + await self.msgPrunerHandler.cancelAndWait() + +proc setDeliveryCallback*(self: RecvMonitor, deliveryCb: DeliveryFeedbackCallback) = + self.deliveryCb = deliveryCb diff --git a/waku/node/delivery_monitor/send_monitor.nim b/waku/node/delivery_monitor/send_monitor.nim new file mode 100644 index 0000000000..ce1ccf0cc9 --- /dev/null +++ b/waku/node/delivery_monitor/send_monitor.nim @@ -0,0 +1,212 @@ +## This module reinforces the publish operation with regular store-v3 requests. +## + +import std/[sets, sequtils] +import chronos, chronicles, libp2p/utility +import + ./delivery_callback, + ./publish_observer, + ../../waku_core, + ./not_delivered_storage/not_delivered_storage, + ../../waku_store/[client, common], + ../../waku_archive/archive, + ../../waku_relay/protocol, + ../../waku_lightpush/client + +const MaxTimeInCache* = chronos.minutes(1) + ## Messages older than this time will get completely forgotten on publication and a + ## feedback will be given when that happens + +const SendCheckInterval* = chronos.seconds(3) + ## Interval at which we check that messages have been properly received by a store node + +const MaxMessagesToCheckAtOnce = 100 + ## Max number of messages to check if they were properly archived by a store node + +const ArchiveTime = chronos.seconds(3) + ## Estimation of the time we wait until we start confirming that a message has been properly + ## received and archived by a store node + +type DeliveryInfo = object + pubsubTopic: string + msg: WakuMessage + +type SendMonitor* = ref object of PublishObserver + publishedMessages: Table[WakuMessageHash, DeliveryInfo] + ## Cache that contains the delivery info per message hash. + ## This is needed to make sure the published messages are properly published + + msgStoredCheckerHandle: Future[void] ## handle that allows to stop the async task + + notDeliveredStorage: NotDeliveredStorage + ## NOTE: this is not fully used because that might be tackled by higher abstraction layers + + storeClient: WakuStoreClient + deliveryCb: DeliveryFeedbackCallback + + wakuRelay: protocol.WakuRelay + wakuLightpushClient: WakuLightPushClient + +proc new*( + T: type SendMonitor, + storeClient: WakuStoreClient, + wakuRelay: protocol.WakuRelay, + wakuLightpushClient: WakuLightPushClient, +): Result[T, string] = + if wakuRelay.isNil() and wakuLightpushClient.isNil(): + return err( + "Could not create SendMonitor. wakuRelay or wakuLightpushClient should be set" + ) + + let notDeliveredStorage = ?NotDeliveredStorage.new() + + let sendMonitor = SendMonitor( + notDeliveredStorage: notDeliveredStorage, + storeClient: storeClient, + wakuRelay: wakuRelay, + wakuLightpushClient: wakuLightPushClient, + ) + + if not wakuRelay.isNil(): + wakuRelay.addPublishObserver(sendMonitor) + + if not wakuLightpushClient.isNil(): + wakuLightpushClient.addPublishObserver(sendMonitor) + + return ok(sendMonitor) + +proc performFeedbackAndCleanup( + self: SendMonitor, + msgsToDiscard: Table[WakuMessageHash, DeliveryInfo], + success: DeliverySuccess, + dir: DeliveryDirection, + comment: string, +) = + ## This procs allows to bring delivery feedback to the API client + ## It requires a 'deliveryCb' to be registered beforehand. + if self.deliveryCb.isNil(): + error "deliveryCb is nil in performFeedbackAndCleanup", + success, dir, comment, hashes = toSeq(msgsToDiscard.keys).mapIt(shortLog(it)) + return + + for hash, deliveryInfo in msgsToDiscard: + debug "send monitor performFeedbackAndCleanup", + success, dir, comment, msg_hash = shortLog(hash) + + self.deliveryCb(success, dir, comment, hash, deliveryInfo.msg) + self.publishedMessages.del(hash) + +proc checkMsgsInStore( + self: SendMonitor, msgsToValidate: Table[WakuMessageHash, DeliveryInfo] +): Future[ + Result[ + tuple[ + publishedCorrectly: Table[WakuMessageHash, DeliveryInfo], + notYetPublished: Table[WakuMessageHash, DeliveryInfo], + ], + void, + ] +] {.async.} = + let hashesToValidate = toSeq(msgsToValidate.keys) + + let storeResp: StoreQueryResponse = ( + await self.storeClient.queryToAny( + StoreQueryRequest(includeData: false, messageHashes: hashesToValidate) + ) + ).valueOr: + error "checkMsgsInStore failed to get remote msgHashes", + hashes = hashesToValidate.mapIt(shortLog(it)), error = $error + return err() + + let publishedHashes = storeResp.messages.mapIt(it.messageHash) + + var notYetPublished: Table[WakuMessageHash, DeliveryInfo] + var publishedCorrectly: Table[WakuMessageHash, DeliveryInfo] + + for msgHash, deliveryInfo in msgsToValidate.pairs: + if publishedHashes.contains(msgHash): + publishedCorrectly[msgHash] = deliveryInfo + self.publishedMessages.del(msgHash) ## we will no longer track that message + else: + notYetPublished[msgHash] = deliveryInfo + + return ok((publishedCorrectly: publishedCorrectly, notYetPublished: notYetPublished)) + +proc processMessages(self: SendMonitor) {.async.} = + var msgsToValidate: Table[WakuMessageHash, DeliveryInfo] + var msgsToDiscard: Table[WakuMessageHash, DeliveryInfo] + + let now = getNowInNanosecondTime() + let timeToCheckThreshold = now - ArchiveTime.nanos + let maxLifeTime = now - MaxTimeInCache.nanos + + for hash, deliveryInfo in self.publishedMessages.pairs: + if deliveryInfo.msg.timestamp < maxLifeTime: + ## message is too old + msgsToDiscard[hash] = deliveryInfo + + if deliveryInfo.msg.timestamp < timeToCheckThreshold: + msgsToValidate[hash] = deliveryInfo + + ## Discard the messages that are too old + self.performFeedbackAndCleanup( + msgsToDiscard, DeliverySuccess.UNSUCCESSFUL, DeliveryDirection.PUBLISHING, + "Could not publish messages. Please try again.", + ) + + let (publishedCorrectly, notYetPublished) = ( + await self.checkMsgsInStore(msgsToValidate) + ).valueOr: + return ## the error log is printed in checkMsgsInStore + + ## Give positive feedback for the correctly published messages + self.performFeedbackAndCleanup( + publishedCorrectly, DeliverySuccess.SUCCESSFUL, DeliveryDirection.PUBLISHING, + "messages published correctly", + ) + + ## Try to publish again + for msgHash, deliveryInfo in notYetPublished.pairs: + let pubsubTopic = deliveryInfo.pubsubTopic + let msg = deliveryInfo.msg + if not self.wakuRelay.isNil(): + debug "trying to publish again with wakuRelay", msgHash, pubsubTopic + let ret = await self.wakuRelay.publish(pubsubTopic, msg) + if ret == 0: + error "could not publish with wakuRelay.publish", msgHash, pubsubTopic + continue + + if not self.wakuLightpushClient.isNil(): + debug "trying to publish again with wakuLightpushClient", msgHash, pubsubTopic + (await self.wakuLightpushClient.publishToAny(pubsubTopic, msg)).isOkOr: + error "could not publish with publishToAny", error = $error + continue + +proc checkIfMessagesStored(self: SendMonitor) {.async.} = + ## Continuously monitors that the sent messages have been received by a store node + while true: + await self.processMessages() + await sleepAsync(SendCheckInterval) + +method onMessagePublished( + self: SendMonitor, pubsubTopic: string, msg: WakuMessage +) {.gcsafe, raises: [].} = + ## Implementation of the PublishObserver interface. + ## + ## When publishing a message either through relay or lightpush, we want to add some extra effort + ## to make sure it is received to one store node. Hence, keep track of those published messages. + + debug "onMessagePublished" + let msgHash = computeMessageHash(pubSubTopic, msg) + + if not self.publishedMessages.hasKey(msgHash): + self.publishedMessages[msgHash] = DeliveryInfo(pubsubTopic: pubsubTopic, msg: msg) + +proc startSendMonitor*(self: SendMonitor) = + self.msgStoredCheckerHandle = self.checkIfMessagesStored() + +proc stopSendMonitor*(self: SendMonitor) = + self.msgStoredCheckerHandle.cancel() + +proc setDeliveryCallback*(self: SendMonitor, deliveryCb: DeliveryFeedbackCallback) = + self.deliveryCb = deliveryCb diff --git a/waku/node/delivery_monitor/subscriptions_observer.nim b/waku/node/delivery_monitor/subscriptions_observer.nim new file mode 100644 index 0000000000..0c5d552210 --- /dev/null +++ b/waku/node/delivery_monitor/subscriptions_observer.nim @@ -0,0 +1,13 @@ +import chronicles + +type SubscriptionObserver* = ref object of RootObj + +method onSubscribe*( + self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string] +) {.base, gcsafe, raises: [].} = + error "onSubscribe not implemented" + +method onUnsubscribe*( + self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string] +) {.gcsafe, raises: [].} = + error "onUnsubscribe not implemented" diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 07b67a8b2e..617648aff8 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -4,7 +4,13 @@ import std/options, chronicles, chronos, libp2p/protocols/protocol, bearssl/rand import - ../node/peer_manager, ../waku_core, ./common, ./protocol_metrics, ./rpc_codec, ./rpc + ../node/peer_manager, + ../node/delivery_monitor/subscriptions_observer, + ../waku_core, + ./common, + ./protocol_metrics, + ./rpc_codec, + ./rpc logScope: topics = "waku filter client" @@ -13,12 +19,16 @@ type WakuFilterClient* = ref object of LPProtocol rng: ref HmacDrbgContext peerManager: PeerManager pushHandlers: seq[FilterPushHandler] + subscrObservers: seq[SubscriptionObserver] func generateRequestId(rng: ref HmacDrbgContext): string = var bytes: array[10, byte] hmacDrbgGenerate(rng[], bytes) return toHex(bytes) +proc addSubscrObserver*(wfc: WakuFilterClient, obs: SubscriptionObserver) = + wfc.subscrObservers.add(obs) + proc sendSubscribeRequest( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, @@ -113,7 +123,12 @@ proc subscribe*( requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq ) - return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + + for obs in wfc.subscrObservers: + obs.onSubscribe(pubSubTopic, contentTopicSeq) + + return ok() proc unsubscribe*( wfc: WakuFilterClient, @@ -132,7 +147,12 @@ proc unsubscribe*( requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq ) - return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + + for obs in wfc.subscrObservers: + obs.onUnsubscribe(pubSubTopic, contentTopicSeq) + + return ok() proc unsubscribeAll*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index da502d4562..4f516dec5d 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -3,6 +3,7 @@ import std/options, results, chronicles, chronos, metrics, bearssl/rand import ../node/peer_manager, + ../node/delivery_monitor/publish_observer, ../utils/requests, ../waku_core, ./common, @@ -16,12 +17,16 @@ logScope: type WakuLightPushClient* = ref object peerManager*: PeerManager rng*: ref rand.HmacDrbgContext + publishObservers: seq[PublishObserver] proc new*( T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext ): T = WakuLightPushClient(peerManager: peerManager, rng: rng) +proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = + wl.publishObservers.add(obs) + proc sendPushRequest( wl: WakuLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = @@ -67,4 +72,26 @@ proc publish*( peer: PeerId | RemotePeerInfo, ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) - return await wl.sendPushRequest(pushRequest, peer) + ?await wl.sendPushRequest(pushRequest, peer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return ok() + +proc publishToAny*( + wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + ## This proc is similar to the publish one but in this case + ## we don't specify a particular peer and instead we get it from peer manager + + let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: + return err("could not retrieve a peer supporting WakuLightPushCodec") + + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) + ?await wl.sendPushRequest(pushRequest, peer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return ok() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index b922f69d3b..218dcf3d23 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -18,7 +18,7 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch -import ../waku_core, ./message_id +import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer logScope: topics = "waku relay" @@ -128,6 +128,7 @@ type wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]] # a map of validators to error messages to return when validation fails validatorInserted: Table[PubsubTopic, bool] + publishObservers: seq[PublishObserver] proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = @@ -254,7 +255,14 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) = + ## Observer when the api client performed a publish operation. This + ## is initially aimed for bringing an additional layer of delivery reliability thanks + ## to store + w.publishObservers.add(obs) + proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + ## Observes when a message is sent/received from the GossipSub PoV procCall GossipSub(w).addObserver(observer) method start*(w: WakuRelay) {.async, base.} = @@ -391,4 +399,8 @@ proc publish*( let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data) + if relayedPeerCount > 0: + for obs in w.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + return relayedPeerCount diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index d6d53a9c7c..4b05249e38 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -59,3 +59,22 @@ proc query*( return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer)) return await self.sendStoreRequest(request, connection) + +proc queryToAny*( + self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId) +): Future[StoreQueryResult] {.async.} = + ## This proc is similar to the query one but in this case + ## we don't specify a particular peer and instead we get it from peer manager + + if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: + return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) + + let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: + return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected")) + + let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr: + waku_store_errors.inc(labelValues = [dialFailure]) + + return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer)) + + return await self.sendStoreRequest(request, connection) diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index a4e5467a23..2f47cc6c8c 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -25,9 +25,6 @@ import logScope: topics = "waku store" -const MaxMessageTimestampVariance* = getNanoSecondTime(20) - # 20 seconds maximum allowable sender timestamp "drift" - type StoreQueryRequestHandler* = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 6f158394e2..a4e5c92468 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -26,9 +26,6 @@ import logScope: topics = "waku legacy store" -const MaxMessageTimestampVariance* = getNanoSecondTime(20) - # 20 seconds maximum allowable sender timestamp "drift" - type HistoryQueryHandler* = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.}