Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: delivery monitor for store v3 reliability protocol #2977

Merged
merged 9 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS NotDeliveredMessages(
messageHash BLOB PRIMARY KEY,
timestamp INTEGER NOT NULL,
contentTopic BLOB NOT NULL,
pubsubTopic BLOB NOT NULL,
payload BLOB,
meta BLOB,
version INTEGER NOT NULL
);
9 changes: 9 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,15 @@ type WakuNodeConf* = object
name: "lightpushnode"
.}: string

## Reliability config
reliabilityEnabled* {.
desc:
"""Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests.
with the drawback of consuming some more bandwitdh.""",
defaultValue: false,
name: "reliability"
.}: bool

## REST HTTP config
rest* {.
desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest"
Expand Down
25 changes: 24 additions & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/delivery_monitor/delivery_monitor,
../waku_api/message_cache,
../waku_api/rest/server,
../waku_archive,
Expand Down Expand Up @@ -51,6 +52,8 @@ type Waku* = object

node*: WakuNode

deliveryMonitor: DeliveryMonitor

restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef

Expand Down Expand Up @@ -147,13 +150,29 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)

let node = nodeRes.get()

var deliveryMonitor: DeliveryMonitor
if conf.reliabilityEnabled:
if conf.storenode == "":
return err("A storenode should be set when reliability mode is on")

let deliveryMonitorRes = DeliveryMonitor.new(
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
node.wakuFilterClient,
)
if deliveryMonitorRes.isErr():
return err("could not create delivery monitor: " & $deliveryMonitorRes.error)
deliveryMonitor = deliveryMonitorRes.get()

var waku = Waku(
version: git_version,
conf: confCopy,
rng: rng,
key: confCopy.nodekey.get(),
node: nodeRes.get(),
node: node,
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(),
deliveryMonitor: deliveryMonitor,
)

ok(waku)
Expand Down Expand Up @@ -237,6 +256,10 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
(await waku.wakuDiscV5.start()).isOkOr:
return err("failed to start waku discovery v5: " & $error)

## Reliability
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()

return ok()

# Waku shutdown
Expand Down
17 changes: 17 additions & 0 deletions waku/node/delivery_monitor/delivery_callback.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import ../../waku_core

type DeliveryDirection* {.pure.} = enum
PUBLISHING
RECEIVING

type DeliverySuccess* {.pure.} = enum
SUCCESSFUL
UNSUCCESSFUL

type DeliveryFeedbackCallback* = proc(
success: DeliverySuccess,
dir: DeliveryDirection,
comment: string,
msgHash: WakuMessageHash,
msg: WakuMessage,
) {.gcsafe, raises: [].}
43 changes: 43 additions & 0 deletions waku/node/delivery_monitor/delivery_monitor.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
## This module helps to ensure the correct transmission and reception of messages

import results
import chronos
import
./recv_monitor,
./send_monitor,
./delivery_callback,
../../waku_core,
../../waku_store/client,
../../waku_relay/protocol,
../../waku_lightpush/client,
../../waku_filter_v2/client

type DeliveryMonitor* = ref object
sendMonitor: SendMonitor
recvMonitor: RecvMonitor

proc new*(
T: type DeliveryMonitor,
storeClient: WakuStoreClient,
wakuRelay: protocol.WakuRelay,
wakuLightpushClient: WakuLightPushClient,
wakuFilterClient: WakuFilterClient,
): Result[T, string] =
## storeClient is needed to give store visitility to DeliveryMonitor
## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish
let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient)
let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient)
return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor))

proc startDeliveryMonitor*(self: DeliveryMonitor) =
self.sendMonitor.startSendMonitor()
self.recvMonitor.startRecvMonitor()

proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} =
self.sendMonitor.stopSendMonitor()
await self.recvMonitor.stopRecvMonitor()

proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) =
## The deliveryCb is a proc defined by the api client so that it can get delivery feedback
self.sendMonitor.setDeliveryCallback(deliveryCb)
self.recvMonitor.setDeliveryCallback(deliveryCb)
26 changes: 26 additions & 0 deletions waku/node/delivery_monitor/not_delivered_storage/migrations.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{.push raises: [].}

import std/[tables, strutils, os], results, chronicles
import ../../../common/databases/db_sqlite, ../../../common/databases/common

logScope:
topics = "waku node delivery_monitor"

const TargetSchemaVersion* = 1
# increase this when there is an update in the database schema

template projectRoot(): string =
currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".."

const PeerStoreMigrationPath: string = projectRoot / "migrations" / "sent_msgs"

proc migrate*(db: SqliteDatabase): DatabaseResult[void] =
debug "starting peer store's sqlite database migration for sent messages"

let migrationRes =
migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath)
if migrationRes.isErr():
return err("failed to execute migration scripts: " & migrationRes.error)

debug "finished peer store's sqlite database migration for sent messages"
ok()
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## This module is aimed to keep track of the sent/published messages that are considered
## not being properly delivered.
##
## The archiving of such messages will happen in a local sqlite database.
##
## In the very first approach, we consider that a message is sent properly is it has been
## received by any store node.
##

import results
import
../../../common/databases/db_sqlite,
../../../waku_core/message/message,
../../../node/delivery_monitor/not_delivered_storage/migrations

const NotDeliveredMessagesDbUrl = "not-delivered-messages.db"

type NotDeliveredStorage* = ref object
database: SqliteDatabase

type TrackedWakuMessage = object
msg: WakuMessage
numTrials: uint
## for statistics purposes. Counts the number of times the node has tried to publish it

proc new*(T: type NotDeliveredStorage): Result[T, string] =
let db = ?SqliteDatabase.new(NotDeliveredMessagesDbUrl)

?migrate(db)

return ok(NotDeliveredStorage(database: db))

proc archiveMessage*(
self: NotDeliveredStorage, msg: WakuMessage
): Result[void, string] =
## Archives a waku message so that we can keep track of it
## even when the app restarts
return ok()
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
9 changes: 9 additions & 0 deletions waku/node/delivery_monitor/publish_observer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import chronicles
import ../../waku_core/message/message

type PublishObserver* = ref object of RootObj

method onMessagePublished*(
self: PublishObserver, pubsubTopic: string, message: WakuMessage
) {.base, gcsafe, raises: [].} =
error "onMessagePublished not implemented"
Loading
Loading