From 99149ea9dc252c8d6b6d563d0804c1c474a528db Mon Sep 17 00:00:00 2001 From: Darshan K <35736874+darshankabariya@users.noreply.github.com> Date: Wed, 26 Jun 2024 01:05:03 +0530 Subject: [PATCH] fix: duplicate message forwarding in filter service (#2842) * fix: it's resolve duplicate message forwarding for filter service * chore: update little flow * fix: update implementation using timed cache method * chore: simple format change * chore: simple format change * chore: update put function location * chore: update according suggestion --- waku/waku_filter_v2/protocol.nim | 37 +++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index eff3d0990d..5d5b0a6e46 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -11,7 +11,8 @@ import chronicles, chronos, libp2p/peerid, - libp2p/protocols/protocol + libp2p/protocols/protocol, + libp2p/protocols/pubsub/timedcache import ../node/peer_manager, ../waku_core, @@ -31,6 +32,7 @@ type WakuFilter* = ref object of LPProtocol # a mapping of peer ids to a sequence of filter criteria peerManager: PeerManager maintenanceTask: TimerCallback + messageCache: TimedCache[string] proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = trace "pinging subscriber", peerId = peerId @@ -176,20 +178,27 @@ proc pushToPeers( let msgHash = messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() - notice "pushing message to subscribed peers", - pubsubTopic = messagePush.pubsubTopic, - contentTopic = messagePush.wakuMessage.contentTopic, - target_peer_ids = targetPeerIds, - msg_hash = msgHash - - let bufferToPublish = messagePush.encode().buffer + ## it's also refresh expire of msghash, that's why update cache every time, even if it has a value. + if wf.messageCache.put(msgHash, Moment.now()): + notice "duplicate message found, not-pushing message to subscribed peers", + pubsubTopic = messagePush.pubsubTopic, + contentTopic = messagePush.wakuMessage.contentTopic, + target_peer_ids = targetPeerIds, + msg_hash = msgHash + else: + notice "pushing message to subscribed peers", + pubsubTopic = messagePush.pubsubTopic, + contentTopic = messagePush.wakuMessage.contentTopic, + target_peer_ids = targetPeerIds, + msg_hash = msgHash - var pushFuts: seq[Future[void]] - for peerId in peers: - let pushFut = wf.pushToPeer(peerId, bufferToPublish) - pushFuts.add(pushFut) + let bufferToPublish = messagePush.encode().buffer + var pushFuts: seq[Future[void]] - await allFutures(pushFuts) + for peerId in peers: + let pushFut = wf.pushToPeer(peerId, bufferToPublish) + pushFuts.add(pushFut) + await allFutures(pushFuts) proc maintainSubscriptions*(wf: WakuFilter) = trace "maintaining subscriptions" @@ -289,12 +298,14 @@ proc new*( subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, + timeout: Duration = 2.minutes, ): T = let wf = WakuFilter( subscriptions: FilterSubscriptions.init( subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer ), peerManager: peerManager, + messageCache: init(TimedCache[string], timeout), ) wf.initProtocolHandler()