Skip to content

Commit

Permalink
libwaku add logic to register filter push callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Nov 19, 2024
1 parent 90d92ea commit d814519
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
16 changes: 11 additions & 5 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
waku/waku_core/message/message,
waku/node/waku_node,
waku/waku_core/topics/pubsub_topic,
waku/waku_core/subscription/push_handler,
waku/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread,
Expand Down Expand Up @@ -73,7 +74,7 @@ proc handleRes[T: string | void](
callback(RET_OK, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_OK

proc relayEventCallback(ctx: ptr WakuContext): WakuRelayHandler =
proc onReceivedMessage(ctx: ptr WakuContext): WakuRelayHandler =
return proc(
pubsubTopic: PubsubTopic, msg: WakuMessage
): Future[system.void] {.async.} =
Expand Down Expand Up @@ -301,7 +302,7 @@ proc waku_relay_publish(
RelayRequest.createShared(
RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback(ctx)),
WakuRelayHandler(onReceivedMessage(ctx)),
wakuMessage,
),
)
Expand Down Expand Up @@ -344,7 +345,7 @@ proc waku_relay_subscribe(
let pst = pubSubTopic.alloc()
defer:
deallocShared(pst)
var cb = relayEventCallback(ctx)
var cb = onReceivedMessage(ctx)

waku_thread
.sendRequestToWakuThread(
Expand Down Expand Up @@ -375,7 +376,7 @@ proc waku_relay_unsubscribe(
RelayRequest.createShared(
RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback(ctx)),
WakuRelayHandler(onReceivedMessage(ctx)),
),
)
.handleRes(callback, userData)
Expand Down Expand Up @@ -433,7 +434,12 @@ proc waku_filter_subscribe(
.sendRequestToWakuThread(
ctx,
RequestType.FILTER,
FilterRequest.createShared(FilterMsgType.SUBSCRIBE, pubSubTopic, contentTopics),
FilterRequest.createShared(
FilterMsgType.SUBSCRIBE,
pubSubTopic,
contentTopics,
FilterPushHandler(onReceivedMessage(ctx)),
),
)
.handleRes(callback, userData)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ type FilterRequest* = object
operation: FilterMsgType
pubsubTopic: cstring
contentTopics: cstring ## comma-separated list of content-topics
filterPushEventCallback: FilterPushHandler ## handles incompung filter pushed msgs

proc createShared*(
T: type FilterRequest,
op: FilterMsgType,
pubsubTopic: cstring = "",
contentTopics: cstring = "",
filterPushEventCallback: FilterPushHandler = nil,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].pubsubTopic = pubsubTopic.alloc()
ret[].contentTopics = contentTopics.alloc()
ret[].filterPushEventCallback = filterPushEventCallback

return ret

Expand All @@ -54,6 +57,8 @@ proc process*(

case self.operation
of SUBSCRIBE:
waku.node.wakuFilterClient.registerPushHandler(self.filterPushEventCallback)

let peer = waku.node.peerManager.selectPeer(WakuFilterSubscribeCodec).valueOr:
let errorMsg =
"could not find peer with WakuFilterSubscribeCodec when subscribing"
Expand Down

0 comments on commit d814519

Please sign in to comment.