diff --git a/library/libwaku.nim b/library/libwaku.nim index 0ccb3cc17..0dac652d0 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -12,6 +12,7 @@ import waku/waku_core/message/message, waku/node/waku_node, waku/waku_core/topics/pubsub_topic, + waku/waku_core/subscription/push_handler, waku/waku_relay/protocol, ./events/json_message_event, ./waku_thread/waku_thread, @@ -73,7 +74,7 @@ proc handleRes[T: string | void]( callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData) return RET_OK -proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler = +proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler = return proc( pubsubTopic: PubsubTopic, msg: WakuMessage ): Future[system.void] {.async.} = @@ -301,7 +302,7 @@ proc waku_relay_publish( RelayRequest.createShared( RelayMsgType.PUBLISH, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback(ctx)), + WakuRelayHandler(onReceivedMessage(ctx)), wakuMessage, ), ) @@ -344,7 +345,7 @@ proc waku_relay_subscribe( let pst = pubSubTopic.alloc() defer: deallocShared(pst) - var cb = relayEventCallback(ctx) + var cb = onReceivedMessage(ctx) waku_thread .sendRequestToWakuThread( @@ -375,7 +376,7 @@ proc waku_relay_unsubscribe( RelayRequest.createShared( RelayMsgType.SUBSCRIBE, PubsubTopic($pst), - WakuRelayHandler(relayEventCallback(ctx)), + WakuRelayHandler(onReceivedMessage(ctx)), ), ) .handleRes(callback, userData) @@ -433,7 +434,12 @@ proc waku_filter_subscribe( .sendRequestToWakuThread( ctx, RequestType.FILTER, - FilterRequest.createShared(FilterMsgType.SUBSCRIBE, pubSubTopic, contentTopics), + FilterRequest.createShared( + FilterMsgType.SUBSCRIBE, + pubSubTopic, + contentTopics, + FilterPushHandler(onReceivedMessage(ctx)), + ), ) .handleRes(callback, userData) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim index 6e7878222..0e916670c 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/filter_request.nim @@ -21,17 +21,20 @@ type FilterRequest* = object operation: FilterMsgType pubsubTopic: cstring contentTopics: cstring ## comma-separated list of content-topics + filterPushEventCallback: FilterPushHandler ## handles incompung filter pushed msgs proc createShared*( T: type FilterRequest, op: FilterMsgType, pubsubTopic: cstring = "", contentTopics: cstring = "", + filterPushEventCallback: FilterPushHandler = nil, ): ptr type T = var ret = createShared(T) ret[].operation = op ret[].pubsubTopic = pubsubTopic.alloc() ret[].contentTopics = contentTopics.alloc() + ret[].filterPushEventCallback = filterPushEventCallback return ret @@ -54,6 +57,8 @@ proc process*( case self.operation of SUBSCRIBE: + waku.node.wakuFilterClient.registerPushHandler(self.filterPushEventCallback) + let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr: let errorMsg = "could not find peer with WakuFilterSubscribeCodec when subscribing"