Skip to content

Commit

Permalink
fix: duplicate message forwarding in filter service (#2842)
Browse files Browse the repository at this point in the history
* fix: it's resolve duplicate message forwarding for filter service

* chore: update little flow

* fix: update implementation using timed cache method

* chore: simple format change

* chore: simple format change

* chore: update put function location

* chore: update according suggestion
  • Loading branch information
darshankabariya authored Jun 25, 2024
1 parent 0105013 commit 99149ea
Showing 1 changed file with 24 additions and 13 deletions.
37 changes: 24 additions & 13 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import
chronicles,
chronos,
libp2p/peerid,
libp2p/protocols/protocol
libp2p/protocols/protocol,
libp2p/protocols/pubsub/timedcache
import
../node/peer_manager,
../waku_core,
Expand All @@ -31,6 +32,7 @@ type WakuFilter* = ref object of LPProtocol
# a mapping of peer ids to a sequence of filter criteria
peerManager: PeerManager
maintenanceTask: TimerCallback
messageCache: TimedCache[string]

proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId = peerId
Expand Down Expand Up @@ -176,20 +178,27 @@ proc pushToPeers(
let msgHash =
messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex()

notice "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash

let bufferToPublish = messagePush.encode().buffer
## it's also refresh expire of msghash, that's why update cache every time, even if it has a value.
if wf.messageCache.put(msgHash, Moment.now()):
notice "duplicate message found, not-pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash
else:
notice "pushing message to subscribed peers",
pubsubTopic = messagePush.pubsubTopic,
contentTopic = messagePush.wakuMessage.contentTopic,
target_peer_ids = targetPeerIds,
msg_hash = msgHash

var pushFuts: seq[Future[void]]
for peerId in peers:
let pushFut = wf.pushToPeer(peerId, bufferToPublish)
pushFuts.add(pushFut)
let bufferToPublish = messagePush.encode().buffer
var pushFuts: seq[Future[void]]

await allFutures(pushFuts)
for peerId in peers:
let pushFut = wf.pushToPeer(peerId, bufferToPublish)
pushFuts.add(pushFut)
await allFutures(pushFuts)

proc maintainSubscriptions*(wf: WakuFilter) =
trace "maintaining subscriptions"
Expand Down Expand Up @@ -289,12 +298,14 @@ proc new*(
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
timeout: Duration = 2.minutes,
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.init(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
),
peerManager: peerManager,
messageCache: init(TimedCache[string], timeout),
)

wf.initProtocolHandler()
Expand Down

0 comments on commit 99149ea

Please sign in to comment.