Skip to content

Commit

Permalink
feat: Store v3
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Feb 21, 2024
1 parent 7aea145 commit 2a7dc0d
Show file tree
Hide file tree
Showing 26 changed files with 1,159 additions and 535 deletions.
4 changes: 2 additions & 2 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ import
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
../../waku/waku_api/rest/legacy_store/handlers as rest_store_api,
../../waku/waku_api/rest/health/handlers as rest_health_api,
../../waku/waku_api/rest/admin/handlers as rest_admin_api,
../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api,
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api,
../../waku/waku_api/jsonrpc/legacy_store/handlers as rpc_store_api,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr/sharding,
Expand Down
115 changes: 92 additions & 23 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ import
../waku_core/topics/sharding,
../waku_relay,
../waku_archive,
../waku_store,
../waku_store_legacy/protocol as legacy_store,
../waku_store_legacy/client as legacy_store_client,
../waku_store_legacy/common as legacy_store_common,
../waku_store/protocol as store,
../waku_store/client as store_client,
../waku_store/common as store_common,
../waku_filter as legacy_filter, #TODO: support for legacy filter protocol will be removed
../waku_filter/client as legacy_filter_client, #TODO: support for legacy filter protocol will be removed
../waku_filter_v2,
Expand Down Expand Up @@ -86,8 +90,10 @@ type
switch*: Switch
wakuRelay*: WakuRelay
wakuArchive*: WakuArchive
wakuStore*: WakuStore
wakuStoreClient*: WakuStoreClient
wakuLegacyStore*: legacy_store.WakuStore
wakuLegacyStoreClient*: legacy_store_client.WakuStoreClient
wakuStore: store.WakuStore
wakuStoreClient*: store_client.WakuStoreClient
wakuFilter*: waku_filter_v2.WakuFilter
wakuFilterClient*: filter_client.WakuFilterClient
wakuFilterLegacy*: legacy_filter.WakuFilterLegacy #TODO: support for legacy filter protocol will be removed
Expand Down Expand Up @@ -775,10 +781,10 @@ proc mountArchive*(node: WakuNode,
asyncSpawn node.wakuArchive.start()
return ok()

## Waku store
## Legacy Waku Store

# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
proc toArchiveQuery(request: legacy_store_common.HistoryQuery): ArchiveQuery =
ArchiveQuery(
pubsubTopic: request.pubsubTopic,
contentTopics: request.contentTopics,
Expand All @@ -790,7 +796,7 @@ proc toArchiveQuery(request: HistoryQuery): ArchiveQuery =
)

# TODO: Review this mapping logic. Maybe, move it to the appplication code
proc toHistoryResult*(res: ArchiveResult): HistoryResult =
proc toHistoryResult*(res: ArchiveResult): legacy_store_common.HistoryResult =
if res.isErr():
let error = res.error
case res.error.kind:
Expand All @@ -809,15 +815,15 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult =
cursor: response.cursor.map(proc(cursor: ArchiveCursor): HistoryCursor = HistoryCursor(pubsubTopic: cursor.pubsubTopic, senderTime: cursor.senderTime, storeTime: cursor.storeTime, digest: cursor.digest)),
))

proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting waku store protocol"
proc mountLegacyStore*(node: WakuNode) {.async.} =
info "mounting waku legacy store protocol"

if node.wakuArchive.isNil():
error "failed to mount waku store protocol", error="waku archive not set"
error "failed to mount waku legacy store protocol", error="waku archive not set"
return

# TODO: Review this handler logic. Maybe, move it to the appplication code
let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): Future[HistoryResult] {.async.} =
let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): Future[legacy_store_common.HistoryResult] {.async.} =
if request.cursor.isSome():
request.cursor.get().checkHistCursor().isOkOr:
return err(error)
Expand All @@ -826,25 +832,29 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
let response = await node.wakuArchive.findMessages(request)
return response.toHistoryResult()

node.wakuStore = WakuStore.new(node.peerManager, node.rng, queryHandler)
node.wakuLegacyStore = legacy_store.WakuStore.new(node.peerManager, node.rng, queryHandler)

if node.started:
# Node has started already. Let's start store too.
await node.wakuStore.start()
await node.wakuLegacyStore.start()

node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec))
node.switch.mount(node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuStoreCodec))

proc mountStoreClient*(node: WakuNode) =
info "mounting store client"
proc mountLegacyStoreClient*(node: WakuNode) =
info "mounting legacy store client"

node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng)
node.wakuLegacyStoreClient = legacy_store_client.WakuStoreClient.new(node.peerManager, node.rng)

proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} =
proc query*(
node: WakuNode,
query: legacy_store_common.HistoryQuery,
peer: RemotePeerInfo
): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
if node.wakuStoreClient.isNil():
return err("waku store client is nil")
if node.wakuLegacyStoreClient.isNil():
return err("waku legacy store client is nil")

let queryRes = await node.wakuStoreClient.query(query, peer)
let queryRes = await node.wakuLegacyStoreClient.query(query, peer)
if queryRes.isErr():
return err($queryRes.error)

Expand All @@ -853,13 +863,16 @@ proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[W
return ok(response)

# TODO: Move to application module (e.g., wakunode2.nim)
proc query*(node: WakuNode, query: HistoryQuery): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe,
proc query*(
node: WakuNode,
query: legacy_store_common.HistoryQuery
): Future[legacy_store_common.WakuStoreResult[legacy_store_common.HistoryResponse]] {.async, gcsafe,
deprecated: "Use 'node.query()' with peer destination instead".} =
## Queries known nodes for historical messages
if node.wakuStoreClient.isNil():
return err("waku store client is nil")
return err("waku legacy store client is nil")

let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
return err("peer_not_found_failure")
Expand Down Expand Up @@ -888,6 +901,62 @@ when defined(waku_exp_store_resume):
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value


## Waku Store

proc toArchiveQuery(request: StoreQueryRequest): ArchiveQuery =
var query = ArchiveQuery()

#TODO

return query

proc toStoreResult(res: ArchiveResult): StoreQueryResult =
var res = StoreQueryResponse()

#TODO

return ok(res)

proc mountStore*(node: WakuNode) {.async.} =
info "mounting waku store protocol"

if node.wakuArchive.isNil():
error "failed to mount waku store protocol", error="waku archive not set"
return

let requestHandler: StoreQueryRequestHandler =
proc(request: StoreQueryRequest): Future[StoreQueryResult] {.async.} =
let request = request.toArchiveQuery()
let response = await node.wakuArchive.findMessages(request)

return response.toStoreResult()

node.wakuStore = store.WakuStore.new(node.peerManager, node.rng, requestHandler)

if node.started:
await node.wakuStore.start()

node.switch.mount(node.wakuStore, protocolMatcher(store_common.WakuStoreCodec))

proc mountStoreClient*(node: WakuNode) =
info "mounting store client"

node.wakuStoreClient = store_client.WakuStoreClient.new(node.peerManager, node.rng)

proc query*(
node: WakuNode,
request: store_common.StoreQueryRequest,
peer: RemotePeerInfo
): Future[store_common.WakuStoreResult[store_common.StoreQueryResponse]] {.async, gcsafe.} =
## Queries known nodes for historical messages
if node.wakuStoreClient.isNil():
return err("waku store v3 client is nil")

let response = (await node.wakuStoreClient.query(request, peer)).valueOr:
return err($error)

return ok(response)

## Waku lightpush

proc mountLightPush*(node: WakuNode) {.async.} =
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_api/jsonrpc/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
json_rpc/rpcserver,
libp2p/[peerinfo, switch]
import
../../../waku_store,
../../../waku_store_legacy/common,
../../../waku_filter,
../../../waku_relay,
../../../waku_node,
Expand Down Expand Up @@ -60,7 +60,7 @@ proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =
connected: it.connectedness == Connectedness.Connected))
peers.add(filterPeers)

if not node.wakuStore.isNil():
if not node.wakuLegacyStore.isNil():
# Map WakuStore peers to WakuPeers and add to return list
let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec)
.mapIt(WakuPeer(multiaddr: constructMultiaddrStr(it),
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import
chronicles,
json_rpc/rpcserver
import
../../../[
waku_core,
waku_store,
waku_node
],
../../../waku_store/rpc,
../../../waku_core,
../../../waku_node,
../../../waku_store_legacy/rpc,
../../../waku_store_legacy/common,
../../../node/peer_manager,
../../../common/paging,
./types
Expand Down Expand Up @@ -57,7 +55,7 @@ proc installStoreApiHandlers*(node: WakuNode, server: RpcServer) =
## Returns history for a list of content topics with optional paging
debug "get_waku_v2_store_v1_messages"

let peerOpt = node.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = node.peerManager.selectPeer(common.WakuStoreCodec)
if peerOpt.isNone():
raise newException(ValueError, "no suitable remote store peers")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ else:
import
std/options
import
../../../waku_store/rpc,
../../../waku_store_legacy/rpc,
../message

export message
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_api/rest/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/[strformat,sequtils,sets,tables],
std/[strformat,sequtils,tables],
stew/byteutils,
chronicles,
json_serialization,
Expand All @@ -13,7 +13,7 @@ import

import
../../../waku_core,
../../../waku_store,
../../../waku_store_legacy/common,
../../../waku_filter,
../../../waku_filter_v2,
../../../waku_lightpush/common,
Expand Down Expand Up @@ -71,7 +71,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
connected: it.connectedness == Connectedness.Connected))
tuplesToWakuPeers(peers, filterV2Peers)

if not node.wakuStore.isNil():
if not node.wakuLegacyStore.isNil():
# Map WakuStore peers to WakuPeers and add to return list
let storePeers = node.peerManager.peerStore
.peers(WakuStoreCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
json_serialization/std/options,
presto/[route, client]
import
../../../waku_store/common,
../../../waku_store_legacy/common,
../serdes,
../responses,
./types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import
presto/route
import
../../../waku_core,
../../../waku_store/common,
../../../waku_store/self_req_handler,
../../../waku_store_legacy/common,
../../../waku_store_legacy/self_req_handler,
../../../waku_node,
../../../node/peer_manager,
../../../common/paging,
Expand Down Expand Up @@ -195,7 +195,7 @@ proc retrieveMsgsFromSelfNode(self: WakuNode, histQuery: HistoryQuery):
## it is not allowed to libp2p-dial a node to itself, by default.
##

let selfResp = (await self.wakuStore.handleSelfStoreRequest(histQuery)).valueOr:
let selfResp = (await self.wakuLegacyStore.handleSelfStoreRequest(histQuery)).valueOr:
return RestApiResponse.internalServerError($error)

let storeResp = selfResp.toStoreResponseRest()
Expand Down Expand Up @@ -250,7 +250,7 @@ proc installStoreApiHandlers*(
if not histQuery.isOk():
return RestApiResponse.badRequest(histQuery.error)

if peerAddr.isNone() and not node.wakuStore.isNil():
if peerAddr.isNone() and not node.wakuLegacyStore.isNil():
## The user didn't specify a peer address and self-node is configured as a store node.
## In this case we assume that the user is willing to retrieve the messages stored by
## the local/self store node.
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
json_serialization/std/options,
presto/[route, client, common]
import
../../../waku_store/common as waku_store_common,
../../../waku_store_legacy/common as waku_store_common,
../../../common/base64,
../../../waku_core,
../serdes
Expand Down
Loading

0 comments on commit 2a7dc0d

Please sign in to comment.