From 78ed6a7e2e906c2ff949015e758f2dac01180fe9 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Fri, 14 Jun 2024 16:16:15 +0200
Subject: [PATCH 01/12] Added phase 2 - waku-simulatior integration in
README.md
---
apps/liteprotocoltester/.env | 12 +
apps/liteprotocoltester/README.md | 63 ++++++
.../diagnose_connections.nim | 96 ++++++++
.../docker-compose-on-simularor.yml | 212 ++++++++++++++++++
apps/liteprotocoltester/filter_subscriber.nim | 9 +-
.../lightpush_publisher.nim | 74 ++++--
.../liteprotocoltester/liteprotocoltester.nim | 14 +-
apps/liteprotocoltester/nim.cfg | 4 +
apps/liteprotocoltester/run_service_node.sh | 25 ++-
apps/liteprotocoltester/run_tester_node.sh | 54 +++--
apps/liteprotocoltester/tester_config.nim | 27 ++-
apps/liteprotocoltester/tester_message.nim | 13 ++
12 files changed, 557 insertions(+), 46 deletions(-)
create mode 100644 apps/liteprotocoltester/.env
create mode 100644 apps/liteprotocoltester/diagnose_connections.nim
create mode 100644 apps/liteprotocoltester/docker-compose-on-simularor.yml
create mode 100644 apps/liteprotocoltester/nim.cfg
diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env
new file mode 100644
index 0000000000..e4359cade2
--- /dev/null
+++ b/apps/liteprotocoltester/.env
@@ -0,0 +1,12 @@
+NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2
+
+START_PUBLISHING_AFTER=30 # seconds
+NUM_MESSAGES=250
+DELAY_MESSAGES=200 # gap between messages
+
+MIN_MESSAGE_SIZE=1Kb
+MAX_MESSAGE_SIZE=120Kb
+
+# PUBSUB=/waku/2/rs/66/0
+# CONTENT_TOPIC=/tester/1/light-pubsub-example/proto
+
diff --git a/apps/liteprotocoltester/README.md b/apps/liteprotocoltester/README.md
index dbcc9fa5d0..e590fd67ed 100644
--- a/apps/liteprotocoltester/README.md
+++ b/apps/liteprotocoltester/README.md
@@ -37,6 +37,9 @@ At this stage we can only configure number of messages and fixed frequency of th
### Phase 1
+> NOTICE: This part is obsolate due integration with waku-simulator.
+> It needs some rework to make it work again standalone.
+
Lite Protocol Tester application is built under name `liteprotocoltester` in apps/liteprotocoltester folder.
Starting from nwaku repository root:
@@ -48,6 +51,51 @@ docker compose up -d
docker compose logs -f receivernode
```
+### Phase 2
+
+> Integration with waku-simulator!
+
+- For convenient integration is done in cooperation with waku-simulator repository, but nothing is tightly coupled.
+- waku-simulator must be started separately with its own configuration.
+- To enable waku-simulator working without RLN currently a separate branch is to be used.
+- When waku-simulator is configured and up and running, lite-protocol-tester composite docker setup can be started.
+
+```bash
+
+# Start waku-simulator
+
+git clone https://github.com/waku-org/waku-simulator.git ../waku-simulator
+cd ../waku-simulator
+git checkout chore-integrate-liteprotocoltester
+
+# optionally edit .env file
+
+docker compose -f docker-compose-norln.yml up -d
+
+# navigate localhost:30001 to see the waku-simulator dashboard
+
+cd ../{your-repository}
+
+make LOG_LEVEL=DEBUG liteprotocoltester
+
+cd apps/liteprotocoltester
+
+# optionally edit .env file
+
+docker compose -f docker-compose-on-simularor.yml build
+docker compose -f docker-compose-on-simularor.yml up -d
+docker compose -f docker-compose-on-simularor.yml logs -f receivernode
+```
+#### Current setup
+
+- waku-simulator is configured to run with 25 full node
+- liteprotocoltester is configured to run with 3 publisher and 1 receiver
+- liteprotocoltester is configured to run 1 lightpush service and a filter service node
+ - light clients are connected accordingly
+- publishers will send 250 messages in every 200ms with size between 1KiB and 120KiB
+- Notice there is a configurable wait before start publishing messages as it is noticed time is needed for the service nodes to get connected to full nodes from simulator
+- light clients will print report on their and the connected service node's connectivity to the network in every 20 secs.
+
## Configure
### Environment variables for docker compose runs
@@ -56,8 +104,16 @@ docker compose logs -f receivernode
| ---: | :--- | :--- |
| NUM_MESSAGES | Number of message to publish | 120 |
| DELAY_MESSAGES | Frequency of messages in milliseconds | 1000 |
+<<<<<<< HEAD
| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/0/0 |
+=======
+| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/66/0 |
+>>>>>>> 32cc23ff (Added phase 2 - waku-simulatior integration in README.md)
| CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto |
+| START_PUBLISHING_AFTER | Delay in seconds before starting to publish to let service node connected | 5 |
+| MIN_MESSAGE_SIZE | Minimum message size in bytes | 1KiB |
+| MAX_MESSAGE_SIZE | Maximum message size in bytes | 120KiB |
+
### Lite Protocol Tester application cli options
@@ -67,7 +123,14 @@ docker compose logs -f receivernode
| --service-node| Address of the service node to use for lightpush and/or filter service | - |
| --num-messages | Number of message to publish | 120 |
| --delay-messages | Frequency of messages in milliseconds | 1000 |
+<<<<<<< HEAD
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/rs/0/0 |
+=======
+| --min-message-size | Minimum message size in bytes | 1KiB |
+| --max-message-size | Maximum message size in bytes | 120KiB |
+| --start-publishing-after | Delay in seconds before starting to publish to let service node connected in seconds | 5 |
+| --pubsub-topic | Used pubsub_topic for testing | /waku/2/default-waku/proto |
+>>>>>>> 32cc23ff (Added phase 2 - waku-simulatior integration in README.md)
| --content_topic | content_topic for testing | /tester/1/light-pubsub-example/proto |
| --cluster-id | Cluster id for the test | 0 |
| --config-file | TOML configuration file to fine tune the light waku node
Note that some configurations (full node services) are not taken into account | - |
diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim
new file mode 100644
index 0000000000..4973365fb4
--- /dev/null
+++ b/apps/liteprotocoltester/diagnose_connections.nim
@@ -0,0 +1,96 @@
+when (NimMajor, NimMinor) < (1, 4):
+ {.push raises: [Defect].}
+else:
+ {.push raises: [].}
+
+import
+ std/[options, strutils, os, sequtils, net, strformat],
+ chronicles,
+ chronos,
+ metrics,
+ libbacktrace,
+ system/ansi_c,
+ libp2p/crypto/crypto,
+ confutils,
+ libp2p/wire
+
+import
+ ../../waku/common/logging,
+ ../../waku/factory/waku,
+ ../../waku/factory/external_config,
+ ../../waku/node/health_monitor,
+ ../../waku/node/waku_metrics,
+ ../../waku/waku_api/rest/builder as rest_server_builder,
+ ../../waku/node/peer_manager,
+ ../../waku/waku_lightpush/common,
+ ../../waku/waku_relay,
+ ../../waku/waku_filter_v2,
+ ../../waku/waku_api/rest/client,
+ ../../waku/waku_api/rest/admin/client,
+ ./tester_config,
+ ./lightpush_publisher,
+ ./filter_subscriber
+
+logScope:
+ topics = "diagnose connections"
+
+proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} =
+ trace "Starting logSelfPeersLoop diagnosys loop"
+ while true:
+ let selfLighpushPeers = pm.peerStore.getPeersByProtocol(WakuLightPushCodec)
+ let selfRelayPeers = pm.peerStore.getPeersByProtocol(WakuRelayCodec)
+ let selfFilterPeers = pm.peerStore.getPeersByProtocol(WakuFilterSubscribeCodec)
+
+ let printable = catch:
+ """*------------------------------------------------------------------------------------------*
+| Self ({pm.switch.peerInfo}) peers:
+*------------------------------------------------------------------------------------------*
+| Lightpush peers({selfLighpushPeers.len()}): ${selfLighpushPeers}
+*------------------------------------------------------------------------------------------*
+| Filter peers({selfFilterPeers.len()}): ${selfFilterPeers}
+*------------------------------------------------------------------------------------------*
+| Relay peers({selfRelayPeers.len()}): ${selfRelayPeers}
+*------------------------------------------------------------------------------------------*""".fmt()
+
+ if printable.isErr():
+ echo "Error while printing statistics: " & printable.error().msg
+ else:
+ echo printable.get()
+
+ await sleepAsync(interval)
+
+proc logServiceRelayPeers(
+ pm: PeerManager, codec: string, interval: Duration
+) {.async.} =
+ trace "Starting service node connectivity diagnosys loop"
+ while true:
+ echo "*------------------------------------------------------------------------------------------*"
+ echo "| Service peer connectivity:"
+ let selfLighpushPeers = pm.selectPeer(codec)
+ if selfLighpushPeers.isSome():
+ let ma = selfLighpushPeers.get().addrs[0]
+ var serviceIp = initTAddress(ma).valueOr:
+ echo "Error while parsing multiaddress: " & $error
+ continue
+
+ serviceIp.port = Port(8645)
+ let restClient = newRestHttpClient(initTAddress($serviceIp))
+
+ let getPeersRes = await restClient.getPeers()
+
+ if getPeersRes.status == 200:
+ let nrOfPeers = getPeersRes.data.len()
+ echo "Service node (@" & $ma & ") peers: " & $getPeersRes.data
+ else:
+ echo "Error while fetching service node (@" & $ma & ") peers: " &
+ $getPeersRes.data
+ else:
+ echo "No service node peers found"
+
+ echo "*------------------------------------------------------------------------------------------*"
+
+ await sleepAsync(interval)
+
+proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} =
+ asyncSpawn logSelfPeersLoop(pm, chronos.seconds(20))
+ asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20))
diff --git a/apps/liteprotocoltester/docker-compose-on-simularor.yml b/apps/liteprotocoltester/docker-compose-on-simularor.yml
new file mode 100644
index 0000000000..74fc246935
--- /dev/null
+++ b/apps/liteprotocoltester/docker-compose-on-simularor.yml
@@ -0,0 +1,212 @@
+version: "3.7"
+x-logging: &logging
+ logging:
+ driver: json-file
+ options:
+ max-size: 1000m
+
+# Environment variable definitions
+x-eth-client-address: ð_client_address ${ETH_CLIENT_ADDRESS:-} # Add your ETH_CLIENT_ADDRESS after the "-"
+
+x-rln-environment: &rln_env
+ RLN_RELAY_CONTRACT_ADDRESS: ${RLN_RELAY_CONTRACT_ADDRESS:-0xF471d71E9b1455bBF4b85d475afb9BB0954A29c4}
+ RLN_RELAY_CRED_PATH: ${RLN_RELAY_CRED_PATH:-} # Optional: Add your RLN_RELAY_CRED_PATH after the "-"
+ RLN_RELAY_CRED_PASSWORD: ${RLN_RELAY_CRED_PASSWORD:-} # Optional: Add your RLN_RELAY_CRED_PASSWORD after the "-"
+
+x-test-running-conditions: &test_running_conditions
+ NUM_MESSAGES: ${NUM_MESSAGES:-120}
+ DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
+ PUBSUB: ${PUBSUB:-}
+ CONTENT_TOPIC: ${CONTENT_TOPIC:-}
+ MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}
+ MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb}
+ START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds
+
+
+# Services definitions
+services:
+ lightpush-service:
+ image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
+ # ports:
+ # - 30304:30304/tcp
+ # - 30304:30304/udp
+ # - 9005:9005/udp
+ # - 127.0.0.1:8003:8003
+ # - 80:80 #Let's Encrypt
+ # - 8000:8000/tcp #WSS
+ # - 127.0.0.1:8645:8645
+ <<:
+ - *logging
+ environment:
+ DOMAIN: ${DOMAIN}
+ RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
+ ETH_CLIENT_ADDRESS: *eth_client_address
+ EXTRA_ARGS: ${EXTRA_ARGS}
+ <<:
+ - *rln_env
+ volumes:
+ - ./run_service_node.sh:/opt/run_service_node.sh:Z
+ - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
+ - ./rln_tree:/etc/rln_tree/:Z
+ - ./keystore:/keystore:Z
+ entrypoint: sh
+ command:
+ - /opt/run_service_node.sh
+ - LIGHTPUSH
+ networks:
+ - waku-simulator_simulation
+
+ publishernode:
+ image: waku.liteprotocoltester:latest
+ build:
+ context: ../..
+ dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
+ deploy:
+ replicas: ${NUM_PUBLISHER_NODES:-3}
+ # ports:
+ # - 30304:30304/tcp
+ # - 30304:30304/udp
+ # - 9005:9005/udp
+ # - 127.0.0.1:8003:8003
+ # - 80:80 #Let's Encrypt
+ # - 8000:8000/tcp #WSS
+ # - 127.0.0.1:8646:8646
+ <<:
+ - *logging
+ environment:
+ DOMAIN: ${DOMAIN}
+ RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
+ ETH_CLIENT_ADDRESS: *eth_client_address
+ EXTRA_ARGS: ${EXTRA_ARGS}
+ <<:
+ - *rln_env
+ - *test_running_conditions
+ volumes:
+ - ./run_tester_node.sh:/opt/run_tester_node.sh:Z
+ - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
+ - ./rln_tree:/etc/rln_tree/:Z
+ - ./keystore:/keystore:Z
+ entrypoint: sh
+ command:
+ - /opt/run_tester_node.sh
+ - SENDER
+ depends_on:
+ - lightpush-service
+ configs:
+ - source: cfg_tester_node.toml
+ target: config.toml
+ networks:
+ - waku-simulator_simulation
+
+ filter-service:
+ image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
+ # ports:
+ # - 30304:30305/tcp
+ # - 30304:30305/udp
+ # - 9005:9005/udp
+ # - 127.0.0.1:8003:8003
+ # - 80:80 #Let's Encrypt
+ # - 8000:8000/tcp #WSS
+ # - 127.0.0.1:8645:8645
+ <<:
+ - *logging
+ environment:
+ DOMAIN: ${DOMAIN}
+ RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
+ ETH_CLIENT_ADDRESS: *eth_client_address
+ EXTRA_ARGS: ${EXTRA_ARGS}
+ <<:
+ - *rln_env
+ volumes:
+ - ./run_service_node.sh:/opt/run_service_node.sh:Z
+ - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
+ - ./rln_tree:/etc/rln_tree/:Z
+ - ./keystore:/keystore:Z
+ entrypoint: sh
+ command:
+ - /opt/run_service_node.sh
+ - FILTER
+ networks:
+ - waku-simulator_simulation
+
+
+ receivernode:
+ image: waku.liteprotocoltester:latest
+ build:
+ context: ../..
+ dockerfile: ./apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
+ deploy:
+ replicas: ${NUM_RECEIVER_NODES:-1}
+ # ports:
+ # - 30304:30304/tcp
+ # - 30304:30304/udp
+ # - 9005:9005/udp
+ # - 127.0.0.1:8003:8003
+ # - 80:80 #Let's Encrypt
+ # - 8000:8000/tcp #WSS
+ # - 127.0.0.1:8647:8647
+ <<:
+ - *logging
+ environment:
+ DOMAIN: ${DOMAIN}
+ RLN_RELAY_CRED_PASSWORD: "${RLN_RELAY_CRED_PASSWORD}"
+ ETH_CLIENT_ADDRESS: *eth_client_address
+ EXTRA_ARGS: ${EXTRA_ARGS}
+ <<:
+ - *rln_env
+ - *test_running_conditions
+ volumes:
+ - ./run_tester_node.sh:/opt/run_tester_node.sh:Z
+ - ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
+ - ./rln_tree:/etc/rln_tree/:Z
+ - ./keystore:/keystore:Z
+ entrypoint: sh
+ command:
+ - /opt/run_tester_node.sh
+ - RECEIVER
+ depends_on:
+ - filter-service
+ - publishernode
+ configs:
+ - source: cfg_tester_node.toml
+ target: config.toml
+ networks:
+ - waku-simulator_simulation
+
+ ## We have prometheus and grafana defined in waku-simulator already
+ # prometheus:
+ # image: docker.io/prom/prometheus:latest
+ # volumes:
+ # - ./monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:Z
+ # command:
+ # - --config.file=/etc/prometheus/prometheus.yml
+ # ports:
+ # - 127.0.0.1:9090:9090
+ # depends_on:
+ # - servicenode
+
+ # grafana:
+ # image: docker.io/grafana/grafana:latest
+ # env_file:
+ # - ./monitoring/configuration/grafana-plugins.env
+ # volumes:
+ # - ./monitoring/configuration/grafana.ini:/etc/grafana/grafana.ini:Z
+ # - ./monitoring/configuration/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:Z
+ # - ./monitoring/configuration/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:Z
+ # - ./monitoring/configuration/dashboards:/var/lib/grafana/dashboards/:Z
+ # - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_icon.svg:Z
+ # - ./monitoring/configuration/customizations/custom-logo.svg:/usr/share/grafana/public/img/grafana_typelogo.svg:Z
+ # - ./monitoring/configuration/customizations/custom-logo.png:/usr/share/grafana/public/img/fav32.png:Z
+ # ports:
+ # - 0.0.0.0:3000:3000
+ # depends_on:
+ # - prometheus
+
+configs:
+ cfg_tester_node.toml:
+ content: |
+ max-connections = 100
+
+networks:
+ waku-simulator_simulation:
+ external: true
diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim
index 1e12a27f2e..8da493c2d6 100644
--- a/apps/liteprotocoltester/filter_subscriber.nim
+++ b/apps/liteprotocoltester/filter_subscriber.nim
@@ -81,12 +81,17 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
stats.addMessage(testerMessage.sender, testerMessage)
- trace "message received",
+ let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
+
+ notice "message received",
index = testerMessage.index,
count = testerMessage.count,
startedAt = $testerMessage.startedAt,
sinceStart = $testerMessage.sinceStart,
- sincePrev = $testerMessage.sincePrev
+ sincePrev = $testerMessage.sincePrev,
+ size = $testerMessage.size,
+ pubsubTopic = pubsubTopic,
+ hash = msgHash
wakuNode.wakuFilterClient.registerPushHandler(pushHandler)
diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim
index 0cafbe8538..6a0265364d 100644
--- a/apps/liteprotocoltester/lightpush_publisher.nim
+++ b/apps/liteprotocoltester/lightpush_publisher.nim
@@ -1,5 +1,5 @@
import
- std/strformat,
+ std/[strformat, sysrand, random, sequtils],
system/ansi_c,
chronicles,
chronos,
@@ -7,17 +7,34 @@ import
results,
json_serialization as js
import
- waku/[common/logging, waku_node, node/peer_manager, waku_core, waku_lightpush/client],
+ waku/[
+ common/logging,
+ waku_node,
+ node/peer_manager,
+ waku_core,
+ waku_lightpush/client,
+ common/utils/parse_size_units,
+ ],
./tester_config,
./tester_message
+randomize()
+
+type SizeRange* = tuple[min: uint64, max: uint64]
+
+var RANDOM_PALYLOAD {.threadvar.}: seq[byte]
+RANDOM_PALYLOAD = urandom(1024 * 1024)
+ # 1MiB of random payload to be used to extend message
+
proc prepareMessage(
sender: string,
messageIndex, numMessages: uint32,
startedAt: TimeStamp,
prevMessageAt: var Timestamp,
contentTopic: ContentTopic,
-): WakuMessage =
+ size: SizeRange,
+): (WakuMessage, uint64) =
+ var renderSize = rand(size.min .. size.max)
let current = getNowInNanosecondTime()
let payload = ProtocolTesterMessage(
sender: sender,
@@ -26,49 +43,72 @@ proc prepareMessage(
startedAt: startedAt,
sinceStart: current - startedAt,
sincePrev: current - prevMessageAt,
+ size: renderSize,
)
prevMessageAt = current
let text = js.Json.encode(payload)
+ let contentPayload = toBytes(text & " \0")
+
+ if renderSize < len(contentPayload).uint64:
+ renderSize = len(contentPayload).uint64
+
+ let finalPayload = concat(
+ contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64]
+ )
let message = WakuMessage(
- payload: toBytes(text), # content of the message
+ payload: finalPayload, # content of the message
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: current, # current timestamp
)
- return message
+ return (message, renderSize)
proc publishMessages(
wakuNode: WakuNode,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic,
numMessages: uint32,
+ messageSizeRange: SizeRange,
delayMessages: Duration,
) {.async.} =
let startedAt = getNowInNanosecondTime()
var prevMessageAt = startedAt
var failedToSendCount: uint32 = 0
+ var renderMsgSize = messageSizeRange
+ # sets some default of min max message size to avoid conflict with meaningful payload size
+ renderMsgSize.min = max(1024.uint64, renderMsgSize.min) # do not use less than 1KB
+ renderMsgSize.max = max(2048.uint64, renderMsgSize.max) # minimum of max is 2KB
+ renderMsgSize.min = min(renderMsgSize.min, renderMsgSize.max)
+ renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max)
let selfPeerId = $wakuNode.switch.peerInfo.peerId
var messagesSent: uint32 = 1
while numMessages >= messagesSent:
- let message = prepareMessage(
+ let (message, msgSize) = prepareMessage(
selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt,
- lightpushContentTopic,
+ lightpushContentTopic, renderMsgSize,
)
let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message)
+ let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex
+
if wlpRes.isOk():
- info "published message using lightpush",
- index = messagesSent, count = numMessages
+ notice "published message using lightpush",
+ index = messagesSent,
+ count = numMessages,
+ size = msgSize,
+ pubsubTopic = lightpushPubsubTopic,
+ hash = msgHash
else:
- error "failed to publish message using lightpush", err = wlpRes.error
+ error "failed to publish message using lightpush",
+ err = wlpRes.error, hash = msgHash
inc(failedToSendCount)
- await sleepAsync(delayMessages) # Publish every 5 seconds
+ await sleepAsync(delayMessages)
inc(messagesSent)
let report = catch:
@@ -90,8 +130,15 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
return
# give some time to receiver side to set up
- # TODO: this maybe done in more sphisticated way, though.
- let waitTillStartTesting = 5.seconds
+ let waitTillStartTesting = conf.startPublishingAfter.seconds
+
+ let parsedMinMsgSize = parseMsgSize(conf.minTestMessageSize).valueOr:
+ error "failed to parse 'min-test-msg-size' param: ", error = error
+ return
+
+ let parsedMaxMsgSize = parseMsgSize(conf.maxTestMessageSize).valueOr:
+ error "failed to parse 'max-test-msg-size' param: ", error = error
+ return
info "Sending test messages in", wait = waitTillStartTesting
waitFor sleepAsync(waitTillStartTesting)
@@ -104,5 +151,6 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
conf.pubsubTopics[0],
conf.contentTopics[0],
conf.numMessages,
+ (min: parsedMinMsgSize, max: parsedMaxMsgSize),
conf.delayMessages.milliseconds,
)
diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim
index 0fb2a9c699..a4db160845 100644
--- a/apps/liteprotocoltester/liteprotocoltester.nim
+++ b/apps/liteprotocoltester/liteprotocoltester.nim
@@ -18,10 +18,13 @@ import
node/health_monitor,
node/waku_metrics,
waku_api/rest/builder as rest_server_builder,
+ waku_lightpush/common,
+ waku_filter_v2
],
./tester_config,
./lightpush_publisher,
- ./filter_subscriber
+ ./filter_subscriber,
+ ./diagnose_connections
logScope:
topics = "liteprotocoltester main"
@@ -83,7 +86,7 @@ when isMainModule:
wakuConf.logFormat = conf.logFormat
wakuConf.staticNodes = @[conf.serviceNode]
wakuConf.nat = conf.nat
- wakuConf.maxConnections = 100
+ wakuConf.maxConnections = 500
wakuConf.restAddress = conf.restAddress
wakuConf.restPort = conf.restPort
wakuConf.restAllowOrigin = conf.restAllowOrigin
@@ -105,6 +108,9 @@ when isMainModule:
wakuConf.rest = true
+ wakuConf.metricsServer = true
+ wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0")
+
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
@@ -186,8 +192,12 @@ when isMainModule:
info "Node setup complete"
if conf.testFunc == TesterFunctionality.SENDER:
+ waitFor startPeriodicPeerDiagnostic(wakuApp.node.peerManager, WakuLightPushCodec)
setupAndPublish(wakuApp.node, conf)
else:
+ waitFor startPeriodicPeerDiagnostic(
+ wakuApp.node.peerManager, WakuFilterSubscribeCodec
+ )
setupAndSubscribe(wakuApp.node, conf)
runForever()
diff --git a/apps/liteprotocoltester/nim.cfg b/apps/liteprotocoltester/nim.cfg
new file mode 100644
index 0000000000..2231f2ebed
--- /dev/null
+++ b/apps/liteprotocoltester/nim.cfg
@@ -0,0 +1,4 @@
+-d:chronicles_line_numbers
+-d:chronicles_runtime_filtering:on
+-d:discv5_protocol_id:d5waku
+path = "../.."
diff --git a/apps/liteprotocoltester/run_service_node.sh b/apps/liteprotocoltester/run_service_node.sh
index ba50782c29..7a029434d1 100644
--- a/apps/liteprotocoltester/run_service_node.sh
+++ b/apps/liteprotocoltester/run_service_node.sh
@@ -5,6 +5,23 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "Service node IP: ${IP}"
+RETRIES=${RETRIES:=10}
+
+while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do
+ BOOTSTRAP_ENR=$(wget -qO- http://bootstrap:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"enrUri":"\([^"]*\)".*/\1/');
+ echo "Bootstrap node not ready, retrying (retries left: ${RETRIES})"
+ sleep 1
+ RETRIES=$(( $RETRIES - 1 ))
+done
+
+if [ -z "${BOOTSTRAP_ENR}" ]; then
+ echo "Could not get BOOTSTRAP_ENR and none provided. Failing"
+ exit 1
+fi
+
+echo "Using bootstrap node: ${BOOTSTRAP_ENR}"
+
+
exec /usr/bin/wakunode\
--relay=true\
--filter=true\
@@ -20,10 +37,10 @@ exec /usr/bin/wakunode\
--dns-discovery=true\
--discv5-discovery=true\
--discv5-enr-auto-update=True\
- --log-level=DEBUG\
+ --discv5-bootstrap-node=${BOOTSTRAP_ENR}\
+ --log-level=INFO\
--metrics-server=True\
--metrics-server-address=0.0.0.0\
- --nodekey=e3f5e64568b3a612dee609f6e7c0203c501dab6131662922bdcbcabd474281d5\
--nat=extip:${IP}\
- --pubsub-topic=/waku/2/rs/0/0\
- --cluster-id=0
+ --pubsub-topic=/waku/2/rs/66/0\
+ --cluster-id=66
diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh
index 6c633f5dde..9e51261e29 100644
--- a/apps/liteprotocoltester/run_tester_node.sh
+++ b/apps/liteprotocoltester/run_tester_node.sh
@@ -1,5 +1,7 @@
#!/bin/sh
+set -x
+
if test -f .env; then
echo "Using .env file"
. $(pwd)/.env
@@ -16,10 +18,22 @@ NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET))
echo "NODE_INDEX $NODE_INDEX"
+FUNCTION=$1
+if [ "${FUNCTION}" = "SENDER" ]; then
+ FUNCTION=--test-func=SENDER
+ SERVICENAME=lightpush-service
+fi
+
+if [ "${FUNCTION}" = "RECEIVER" ]; then
+ FUNCTION=--test-func=RECEIVER
+ SERVICENAME=filter-service
+fi
+
+
RETRIES=${RETRIES:=10}
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
- SERIVCE_NODE_ADDR=$(wget -qO- http://servicenode:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
+ SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
echo "Service node not ready, retrying (retries left: ${RETRIES})"
sleep 1
RETRIES=$(( $RETRIES - 1 ))
@@ -30,47 +44,43 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then
exit 1
fi
-
if [ -n "${PUBSUB}" ]; then
PUBSUB=--pubsub-topic="${PUBSUB}"
+else
+ PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
fi
if [ -n "${CONTENT_TOPIC}" ]; then
CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}"
fi
-FUNCTION=$1
-
-echo "Tester node: ${FUNCTION}"
-
-REST_PORT=--rest-port=8647
-
-if [ "${FUNCTION}" = "SENDER" ]; then
- FUNCTION=--test-func=SENDER
- REST_PORT=--rest-port=8646
+if [ -n "${START_PUBLISHING_AFTER}" ]; then
+ START_PUBLISHING_AFTER=--start-publishing-after="${START_PUBLISHING_AFTER}"
fi
-if [ "${FUNCTION}" = "RECEIVER" ]; then
- FUNCTION=--test-func=RECEIVER
- REST_PORT=--rest-port=8647
+if [ -n "${MIN_MESSAGE_SIZE}" ]; then
+ MIN_MESSAGE_SIZE=--min-test-msg-size="${MIN_MESSAGE_SIZE}"
fi
-if [ -z "${FUNCTION}" ]; then
- FUNCTION=--test-func=RECEIVER
+if [ -n "${MAX_MESSAGE_SIZE}" ]; then
+ MAX_MESSAGE_SIZE=--max-test-msg-size="${MAX_MESSAGE_SIZE}"
fi
+
+echo "Tester node: ${FUNCTION}"
echo "Using service node: ${SERIVCE_NODE_ADDR}"
+
exec /usr/bin/liteprotocoltester\
- --log-level=DEBUG\
+ --log-level=INFO\
--service-node="${SERIVCE_NODE_ADDR}"\
- --pubsub-topic=/waku/2/rs/0/0\
- --cluster-id=0\
+ --cluster-id=66\
--num-messages=${NUM_MESSAGES}\
--delay-messages=${DELAY_MESSAGES}\
--nat=extip:${IP}\
- ${FUNCTION}\
${PUBSUB}\
${CONTENT_TOPIC}\
- ${REST_PORT}
-
+ ${FUNCTION}\
+ ${START_PUBLISHING_AFTER}\
+ ${MIN_MESSAGE_SIZE}\
+ ${MAX_MESSAGE_SIZE}
# --config-file=config.toml\
diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim
index 8055a22955..5683b544f5 100644
--- a/apps/liteprotocoltester/tester_config.nim
+++ b/apps/liteprotocoltester/tester_config.nim
@@ -20,14 +20,16 @@ import
common/confutils/envvar/std/net as confEnvvarNet,
common/logging,
factory/external_config,
- waku/waku_core,
+ waku_core,
]
export confTomlDefs, confTomlNet, confEnvvarDefs, confEnvvarNet
const
- LitePubsubTopic* = PubsubTopic("/waku/2/rs/0/0")
+ LitePubsubTopic* = PubsubTopic("/waku/2/rs/66/0")
LiteContentTopic* = ContentTopic("/tester/1/light-pubsub-example/proto")
+ DefaultMinTestMessageSizeStr* = "1KiB"
+ DefaultMaxTestMessageSizeStr* = "150KiB"
type TesterFunctionality* = enum
SENDER # pumps messages to the network
@@ -76,6 +78,12 @@ type LiteProtocolTesterConf* = object
desc: "Number of messages to send.", defaultValue: 120, name: "num-messages"
.}: uint32
+ startPublishingAfter* {.
+ desc: "Wait number of seconds before start publishing messages.",
+ defaultValue: 5,
+ name: "start-publishing-after"
+ .}: uint32
+
delayMessages* {.
desc: "Delay between messages in milliseconds.",
defaultValue: 1000,
@@ -105,8 +113,21 @@ type LiteProtocolTesterConf* = object
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
defaultValue: 0,
name: "cluster-id"
- .}: uint32
+ .}: uint16
+ minTestMessageSize* {.
+ desc:
+ "Minimum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
+ defaultValue: DefaultMinTestMessageSizeStr,
+ name: "min-test-msg-size"
+ .}: string
+
+ maxTestMessageSize* {.
+ desc:
+ "Maximum message size. Accepted units: KiB, KB, and B. e.g. 1024KiB; 1500 B; etc.",
+ defaultValue: DefaultMaxTestMessageSizeStr,
+ name: "max-test-msg-size"
+ .}: string
## Tester REST service configuration
restAddress* {.
desc: "Listening address of the REST HTTP server.",
diff --git a/apps/liteprotocoltester/tester_message.nim b/apps/liteprotocoltester/tester_message.nim
index a34cb4b75b..e91a2a3702 100644
--- a/apps/liteprotocoltester/tester_message.nim
+++ b/apps/liteprotocoltester/tester_message.nim
@@ -15,6 +15,7 @@ type ProtocolTesterMessage* = object
startedAt*: int64
sinceStart*: int64
sincePrev*: int64
+ size*: uint64
proc writeValue*(
writer: var JsonWriter[RestJson], value: ProtocolTesterMessage
@@ -26,6 +27,7 @@ proc writeValue*(
writer.writeField("startedAt", value.startedAt)
writer.writeField("sinceStart", value.sinceStart)
writer.writeField("sincePrev", value.sincePrev)
+ writer.writeField("size", value.size)
writer.endRecord()
proc readValue*(
@@ -38,6 +40,7 @@ proc readValue*(
startedAt: Option[int64]
sinceStart: Option[int64]
sincePrev: Option[int64]
+ size: Option[uint64]
for fieldName in readObjectFields(reader):
case fieldName
@@ -77,6 +80,12 @@ proc readValue*(
"Multiple `sincePrev` fields found", "ProtocolTesterMessage"
)
sincePrev = some(reader.readValue(int64))
+ of "size":
+ if size.isSome():
+ reader.raiseUnexpectedField(
+ "Multiple `size` fields found", "ProtocolTesterMessage"
+ )
+ size = some(reader.readValue(uint64))
else:
unrecognizedFieldWarning()
@@ -98,6 +107,9 @@ proc readValue*(
if sincePrev.isNone():
reader.raiseUnexpectedValue("Field `sincePrev` is missing")
+ if size.isNone():
+ reader.raiseUnexpectedValue("Field `size` is missing")
+
value = ProtocolTesterMessage(
sender: sender.get(),
index: index.get(),
@@ -105,4 +117,5 @@ proc readValue*(
startedAt: startedAt.get(),
sinceStart: sinceStart.get(),
sincePrev: sincePrev.get(),
+ size: size.get(),
)
From 9657455442cb2100475de36514071ed3043ed75e Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Tue, 18 Jun 2024 09:15:05 +0200
Subject: [PATCH 02/12] Enhancement on statistics reports, added list of sent
messages with hash, fixed latency calculations
---
apps/liteprotocoltester/.env | 8 +-
.../Dockerfile.liteprotocoltester.compile | 2 +-
apps/liteprotocoltester/filter_subscriber.nim | 2 +-
.../lightpush_publisher.nim | 11 ++
apps/liteprotocoltester/statistics.nim | 146 +++++++++++-------
5 files changed, 109 insertions(+), 60 deletions(-)
diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env
index e4359cade2..c1ab26c33c 100644
--- a/apps/liteprotocoltester/.env
+++ b/apps/liteprotocoltester/.env
@@ -1,11 +1,11 @@
NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2
START_PUBLISHING_AFTER=30 # seconds
-NUM_MESSAGES=250
-DELAY_MESSAGES=200 # gap between messages
+NUM_MESSAGES=10000
+DELAY_MESSAGES=168 # gap between messages
-MIN_MESSAGE_SIZE=1Kb
-MAX_MESSAGE_SIZE=120Kb
+MIN_MESSAGE_SIZE=15Kb
+MAX_MESSAGE_SIZE=145Kb
# PUBSUB=/waku/2/rs/66/0
# CONTENT_TOPIC=/tester/1/light-pubsub-example/proto
diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile
index bef14abb54..5ad5f83852 100644
--- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile
+++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile
@@ -4,7 +4,7 @@
ARG NIMFLAGS
ARG MAKE_TARGET=liteprotocoltester
ARG NIM_COMMIT
- ARG LOG_LEVEL=TRACE
+ ARG LOG_LEVEL=DEBUG
# Get build tools and required header files
RUN apk add --no-cache bash git build-base pcre-dev linux-headers curl jq
diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim
index 8da493c2d6..1ee2bdcca3 100644
--- a/apps/liteprotocoltester/filter_subscriber.nim
+++ b/apps/liteprotocoltester/filter_subscriber.nim
@@ -102,7 +102,7 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
proc(udata: pointer) {.gcsafe.} =
stats.echoStats()
- if stats.checkIfAllMessagesReceived():
+ if waitFor stats.checkIfAllMessagesReceived():
waitFor unsubscribe(
wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0]
)
diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim
index 6a0265364d..5264a8c638 100644
--- a/apps/liteprotocoltester/lightpush_publisher.nim
+++ b/apps/liteprotocoltester/lightpush_publisher.nim
@@ -66,6 +66,8 @@ proc prepareMessage(
return (message, renderSize)
+var sentMessages {.threadvar.}: OrderedTable[uint32, tuple[hash: string, relayed: bool]]
+
proc publishMessages(
wakuNode: WakuNode,
lightpushPubsubTopic: PubsubTopic,
@@ -97,6 +99,7 @@ proc publishMessages(
let msgHash = computeMessageHash(lightpushPubsubTopic, message).to0xHex
if wlpRes.isOk():
+ sentMessages[messagesSent] = (hash: msgHash, relayed: true)
notice "published message using lightpush",
index = messagesSent,
count = numMessages,
@@ -104,6 +107,7 @@ proc publishMessages(
pubsubTopic = lightpushPubsubTopic,
hash = msgHash
else:
+ sentMessages[messagesSent] = (hash: msgHash, relayed: false)
error "failed to publish message using lightpush",
err = wlpRes.error, hash = msgHash
inc(failedToSendCount)
@@ -122,6 +126,12 @@ proc publishMessages(
else:
echo report.get()
+ echo "*--------------------------------------------------------------------------------------------------*"
+ echo "| Index | Relayed | Hash |"
+ for (index, info) in sentMessages.pairs:
+ echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash}"
+ echo "*--------------------------------------------------------------------------------------------------*"
+
discard c_raise(ansi_c.SIGTERM)
proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
@@ -145,6 +155,7 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
info "Start sending messages to service node using lightpush"
+ sentMessages.sort(system.cmp)
# Start maintaining subscription
asyncSpawn publishMessages(
wakuNode,
diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim
index 13e749193c..cb98be3686 100644
--- a/apps/liteprotocoltester/statistics.nim
+++ b/apps/liteprotocoltester/statistics.nim
@@ -4,11 +4,19 @@ import
std/[sets, tables, strutils, sequtils, options, strformat],
chronos/timer as chtimer,
chronicles,
+ chronos,
results
import ./tester_message
type
+ ArrivalInfo = object
+ arrivedAt: Moment
+ prevArrivedAt: Moment
+ prevIndex: uint32
+
+ MessageInfo = tuple[msg: ProtocolTesterMessage, info: ArrivalInfo]
+
StatHelper = object
prevIndex: uint32
prevArrivedAt: Moment
@@ -17,14 +25,13 @@ type
maxIndex: uint32
Statistics* = object
+ received: Table[uint32, MessageInfo]
+ firstReceivedIdx*: uint32
allMessageCount*: uint32
receivedMessages*: uint32
misorderCount*: uint32
lateCount*: uint32
duplicateCount*: uint32
- minLatency*: Duration
- maxLatency*: Duration
- cummulativeLatency: Duration
helper: StatHelper
PerPeerStatistics* = Table[string, Statistics]
@@ -42,24 +49,29 @@ proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
result.helper.prevIndex = 0
result.helper.maxIndex = 0
result.helper.seenIndices.init(expectedMessageCount)
- result.minLatency = nanos(0)
- result.maxLatency = nanos(0)
- result.cummulativeLatency = nanos(0)
+ result.received = initTable[uint32, MessageInfo](expectedMessageCount)
return result
proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
if self.allMessageCount == 0:
self.allMessageCount = msg.count
+ self.firstReceivedIdx = msg.index
elif self.allMessageCount != msg.count:
- warn "Message count mismatch at message",
+ error "Message count mismatch at message",
index = msg.index, expected = self.allMessageCount, got = msg.count
- if not self.helper.seenIndices.contains(msg.index):
- self.helper.seenIndices.incl(msg.index)
- else:
- inc(self.duplicateCount)
+ let currentArrived: MessageInfo = (
+ msg: msg,
+ info: ArrivalInfo(
+ arrivedAt: Moment.now(),
+ prevArrivedAt: self.helper.prevArrivedAt,
+ prevIndex: self.helper.prevIndex,
+ ),
+ )
+
+ if self.received.hasKeyOrPut(msg.index, currentArrived):
warn "Duplicate message", index = msg.index
- ## just do not count into stats
+ inc(self.duplicateCount)
return
## detect misorder arrival and possible lost messages
@@ -67,41 +79,13 @@ proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
inc(self.misorderCount)
warn "Misordered message arrival",
index = msg.index, expected = self.helper.prevIndex + 1
-
- ## collect possible lost message indicies
- for idx in self.helper.prevIndex + 1 ..< msg.index:
- self.helper.lostIndices.incl(idx)
elif self.helper.prevIndex > msg.index:
inc(self.lateCount)
warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1
- else:
- ## may remove late arrival
- self.helper.lostIndices.excl(msg.index)
-
- ## calculate latency
- let currentArrivedAt = Moment.now()
-
- let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt
-
- let expectedDelay: Duration = nanos(msg.sincePrev)
-
- var latency: Duration
-
- # if we have any latency...
- if expectedDelay > delaySincePrevArrived:
- latency = delaySincePrevArrived - expectedDelay
- if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)):
- self.minLatency = latency
- if latency > self.maxLatency:
- self.maxLatency = latency
- self.cummulativeLatency += latency
- else:
- warn "Negative latency detected",
- index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived
self.helper.maxIndex = max(self.helper.maxIndex, msg.index)
self.helper.prevIndex = msg.index
- self.helper.prevArrivedAt = currentArrivedAt
+ self.helper.prevArrivedAt = currentArrived.info.arrivedAt
inc(self.receivedMessages)
proc addMessage*(
@@ -116,21 +100,61 @@ proc addMessage*(
proc lossCount*(self: Statistics): uint32 =
self.helper.maxIndex - self.receivedMessages
-proc averageLatency*(self: Statistics): Duration =
- if self.receivedMessages == 0:
- return nanos(0)
- return self.cummulativeLatency div self.receivedMessages
+proc calcLatency*(self: Statistics): tuple[min, max, avg: Duration] =
+ var
+ minLatency = nanos(0)
+ maxLatency = nanos(0)
+ avgLatency = nanos(0)
+
+ if self.receivedMessages > 2:
+ try:
+ var prevArrivedAt = self.received[self.firstReceivedIdx].info.arrivedAt
+
+ for idx, (msg, arrival) in self.received.pairs:
+ if idx <= 1:
+ continue
+ let expectedDelay = nanos(msg.sincePrev)
+
+ ## latency will be 0 if arrived in shorter time than expected
+ var latency = arrival.arrivedAt - arrival.prevArrivedAt - expectedDelay
+
+ if latency > nanos(0):
+ if minLatency == nanos(0):
+ minLatency = latency
+ else:
+ minLatency = min(minLatency, latency)
+
+ maxLatency = max(maxLatency, latency)
+ avgLatency += latency
+
+ avgLatency = avgLatency div (self.receivedMessages - 1)
+ except KeyError:
+ error "Error while calculating latency"
+
+ return (minLatency, maxLatency, avgLatency)
+
+proc missingIndices*(self: Statistics): seq[uint32] =
+ var missing: seq[uint32] = @[]
+ for idx in 1 .. self.helper.maxIndex:
+ if not self.received.hasKey(idx):
+ missing.add(idx)
+ return missing
proc echoStat*(self: Statistics) =
+ let (minL, maxL, avgL) = self.calcLatency()
+
let printable = catch:
"""*------------------------------------------------------------------------------------------*
| Expected | Received | Target | Loss | Misorder | Late | Duplicate |
|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} |
*------------------------------------------------------------------------------------------*
| Latency stat: |
-| avg latency: {$self.averageLatency():<73}|
-| min latency: {$self.maxLatency:<73}|
-| max latency: {$self.minLatency:<73}|
+| min latency: {$minL:<73}|
+| avg latency: {$avgL:<73}|
+| max latency: {$maxL:<73}|
+*------------------------------------------------------------------------------------------*
+| Lost indices: |
+| {self.missingIndices()} |
*------------------------------------------------------------------------------------------*""".fmt()
if printable.isErr():
@@ -139,6 +163,8 @@ proc echoStat*(self: Statistics) =
echo printable.get()
proc jsonStat*(self: Statistics): string =
+ let minL, maxL, avgL = self.calcLatency()
+
let json = catch:
"""{{"expected":{self.helper.maxIndex},
"received": {self.receivedMessages},
@@ -148,10 +174,11 @@ proc jsonStat*(self: Statistics): string =
"late": {self.lateCount},
"duplicate": {self.duplicateCount},
"latency":
- {{"avg": "{self.averageLatency()}",
- "min": "{self.minLatency}",
- "max": "{self.maxLatency}"
- }}
+ {{"avg": "{avgL}",
+ "min": "{minL}",
+ "max": "{maxL}"
+ }},
+ "lostIndices": {self.missingIndices()}
}}""".fmt()
if json.isErr:
return "{\"result:\": \"" & json.error.msg & "\"}"
@@ -189,14 +216,25 @@ proc jsonStats*(self: PerPeerStatistics): string =
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
"\"}"
-proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool =
+proc checkIfAllMessagesReceived*(self: PerPeerStatistics): Future[bool] {.async.} =
# if there are no peers have sent messages, assume we just have started.
if self.len == 0:
return false
for stat in self.values:
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
- stat.receivedMessages < stat.allMessageCount:
+ stat.helper.maxIndex < stat.allMessageCount:
return false
+ ## Ok, we see last message arrived from all peers,
+ ## lets check if all messages are received
+ ## and if not let's wait another 20 secs to give chance the system will send them.
+ var shallWait = false
+ for stat in self.values:
+ if stat.receivedMessages < stat.allMessageCount:
+ shallWait = true
+
+ if shallWait:
+ await sleepAsync(chtimer.seconds(20))
+
return true
From cd41ce196ccccdac81a8ffce94738c3e9865fd7f Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Fri, 2 Aug 2024 15:44:37 +0200
Subject: [PATCH 03/12] Enable standalonde running liteprotocoltester agains
any waku network/fleet
---
apps/liteprotocoltester/.env | 15 +++---
apps/liteprotocoltester/README.md | 47 +++++++++++++++----
.../diagnose_connections.nim | 2 +-
.../docker-compose-on-simularor.yml | 4 ++
apps/liteprotocoltester/docker-compose.yml | 2 +
.../lightpush_publisher.nim | 9 ++--
.../liteprotocoltester/liteprotocoltester.nim | 26 +++++-----
apps/liteprotocoltester/run_service_node.sh | 15 +++++-
apps/liteprotocoltester/run_tester_node.sh | 46 +++++++++++++-----
9 files changed, 119 insertions(+), 47 deletions(-)
mode change 100644 => 100755 apps/liteprotocoltester/run_service_node.sh
mode change 100644 => 100755 apps/liteprotocoltester/run_tester_node.sh
diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env
index c1ab26c33c..3ee0ea8e70 100644
--- a/apps/liteprotocoltester/.env
+++ b/apps/liteprotocoltester/.env
@@ -1,12 +1,15 @@
-NWAKU_IMAGE=quay.io/wakuorg/nwaku-pr:2800-rln-v2
+#NWAKU_IMAGE=
-START_PUBLISHING_AFTER=30 # seconds
-NUM_MESSAGES=10000
-DELAY_MESSAGES=168 # gap between messages
+START_PUBLISHING_AFTER=10 # seconds
+NUM_MESSAGES=0 # 0 for infinite
+DELAY_MESSAGES=1000 #1s gap between messages
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
-# PUBSUB=/waku/2/rs/66/0
-# CONTENT_TOPIC=/tester/1/light-pubsub-example/proto
+PUBSUB=/waku/2/rs/16/32
+CONTENT_TOPIC=/tester/1/light-pubsub-example/proto
+CLUSTER_ID=16
+#BOOTSTRAP_ENR=
+#SERIVCE_NODE_ADDR=/ip4/8.218.23.76/tcp/30303/p2p/16Uiu2HAmGwcE8v7gmJNEWFtZtojYpPMTHy2jBLL6xRk33qgDxFWX
diff --git a/apps/liteprotocoltester/README.md b/apps/liteprotocoltester/README.md
index e590fd67ed..8c6a1f1236 100644
--- a/apps/liteprotocoltester/README.md
+++ b/apps/liteprotocoltester/README.md
@@ -96,20 +96,53 @@ docker compose -f docker-compose-on-simularor.yml logs -f receivernode
- Notice there is a configurable wait before start publishing messages as it is noticed time is needed for the service nodes to get connected to full nodes from simulator
- light clients will print report on their and the connected service node's connectivity to the network in every 20 secs.
+### Phase 3
+
+> Run independently on a chosen waku fleet
+
+This option is simple as is just to run the built liteprotocoltester binary with run_tester_node.sh script.
+
+Syntax:
+`./run_tester_node.sh `
+
+How to run from you nwaku repository:
+```bash
+cd ../{your-repository}
+
+make LOG_LEVEL=DEBUG liteprotocoltester
+
+cd apps/liteprotocoltester
+
+# optionally edit .env file
+
+# run publisher side
+./run_tester_node.sh ../../build/liteprotocoltester SENDER [chosen service node address that support lightpush]
+
+# or run receiver side
+./run_tester_node.sh ../../build/liteprotocoltester RECEIVER [chosen service node address that support filter service]
+```
+
+#### Recommendations
+
+In order to run on any kind of network, it is recommended to deploy the built `liteprotocoltester` binary with the `.env` file and the `run_tester_node.sh` script to the desired machine.
+
+Select a lightpush service node and a filter service node from the targeted network, or you can run your own. Note down the selected peers peer_id.
+
+Run a SENDER role liteprotocoltester and a RECEIVER role one on different terminals. Depending on the test aim, you may want to redirect the output to a file.
+
+> RECEIVER side will periodically print statistics to standard output.
+
## Configure
### Environment variables for docker compose runs
| Variable | Description | Default |
| ---: | :--- | :--- |
-| NUM_MESSAGES | Number of message to publish | 120 |
+| NUM_MESSAGES | Number of message to publish, 0 means infinite | 120 |
| DELAY_MESSAGES | Frequency of messages in milliseconds | 1000 |
-<<<<<<< HEAD
-| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/0/0 |
-=======
| PUBSUB | Used pubsub_topic for testing | /waku/2/rs/66/0 |
->>>>>>> 32cc23ff (Added phase 2 - waku-simulatior integration in README.md)
| CONTENT_TOPIC | content_topic for testing | /tester/1/light-pubsub-example/proto |
+| CLUSTER_ID | cluster_id of the network | 16 |
| START_PUBLISHING_AFTER | Delay in seconds before starting to publish to let service node connected | 5 |
| MIN_MESSAGE_SIZE | Minimum message size in bytes | 1KiB |
| MAX_MESSAGE_SIZE | Maximum message size in bytes | 120KiB |
@@ -123,14 +156,10 @@ docker compose -f docker-compose-on-simularor.yml logs -f receivernode
| --service-node| Address of the service node to use for lightpush and/or filter service | - |
| --num-messages | Number of message to publish | 120 |
| --delay-messages | Frequency of messages in milliseconds | 1000 |
-<<<<<<< HEAD
-| --pubsub-topic | Used pubsub_topic for testing | /waku/2/rs/0/0 |
-=======
| --min-message-size | Minimum message size in bytes | 1KiB |
| --max-message-size | Maximum message size in bytes | 120KiB |
| --start-publishing-after | Delay in seconds before starting to publish to let service node connected in seconds | 5 |
| --pubsub-topic | Used pubsub_topic for testing | /waku/2/default-waku/proto |
->>>>>>> 32cc23ff (Added phase 2 - waku-simulatior integration in README.md)
| --content_topic | content_topic for testing | /tester/1/light-pubsub-example/proto |
| --cluster-id | Cluster id for the test | 0 |
| --config-file | TOML configuration file to fine tune the light waku node
Note that some configurations (full node services) are not taken into account | - |
diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim
index 4973365fb4..e6e965da7e 100644
--- a/apps/liteprotocoltester/diagnose_connections.nim
+++ b/apps/liteprotocoltester/diagnose_connections.nim
@@ -93,4 +93,4 @@ proc logServiceRelayPeers(
proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} =
asyncSpawn logSelfPeersLoop(pm, chronos.seconds(20))
- asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20))
+ # asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20))
diff --git a/apps/liteprotocoltester/docker-compose-on-simularor.yml b/apps/liteprotocoltester/docker-compose-on-simularor.yml
index 74fc246935..58191d1302 100644
--- a/apps/liteprotocoltester/docker-compose-on-simularor.yml
+++ b/apps/liteprotocoltester/docker-compose-on-simularor.yml
@@ -89,7 +89,9 @@ services:
entrypoint: sh
command:
- /opt/run_tester_node.sh
+ - /usr/bin/liteprotocoltester
- SENDER
+ - waku-sim
depends_on:
- lightpush-service
configs:
@@ -163,7 +165,9 @@ services:
entrypoint: sh
command:
- /opt/run_tester_node.sh
+ - /usr/bin/liteprotocoltester
- RECEIVER
+ - waku-sim
depends_on:
- filter-service
- publishernode
diff --git a/apps/liteprotocoltester/docker-compose.yml b/apps/liteprotocoltester/docker-compose.yml
index ccbe26493a..14b1e80065 100644
--- a/apps/liteprotocoltester/docker-compose.yml
+++ b/apps/liteprotocoltester/docker-compose.yml
@@ -80,6 +80,7 @@ services:
entrypoint: sh
command:
- /opt/run_tester_node.sh
+ - /usr/bin/liteprotocoltester
- SENDER
depends_on:
- servicenode
@@ -118,6 +119,7 @@ services:
entrypoint: sh
command:
- /opt/run_tester_node.sh
+ - /usr/bin/liteprotocoltester
- RECEIVER
depends_on:
- servicenode
diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim
index 5264a8c638..ac437995c5 100644
--- a/apps/liteprotocoltester/lightpush_publisher.nim
+++ b/apps/liteprotocoltester/lightpush_publisher.nim
@@ -87,11 +87,12 @@ proc publishMessages(
renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max)
let selfPeerId = $wakuNode.switch.peerInfo.peerId
+ var numMessagesToSend = if numMessages == 0: uint32.high else: numMessages
var messagesSent: uint32 = 1
- while numMessages >= messagesSent:
+ while numMessagesToSend >= messagesSent:
let (message, msgSize) = prepareMessage(
- selfPeerId, messagesSent, numMessages, startedAt, prevMessageAt,
+ selfPeerId, messagesSent, numMessagesToSend, startedAt, prevMessageAt,
lightpushContentTopic, renderMsgSize,
)
let wlpRes = await wakuNode.lightpushPublish(some(lightpushPubsubTopic), message)
@@ -102,7 +103,7 @@ proc publishMessages(
sentMessages[messagesSent] = (hash: msgHash, relayed: true)
notice "published message using lightpush",
index = messagesSent,
- count = numMessages,
+ count = numMessagesToSend,
size = msgSize,
pubsubTopic = lightpushPubsubTopic,
hash = msgHash
@@ -118,7 +119,7 @@ proc publishMessages(
let report = catch:
"""*----------------------------------------*
| Expected | Sent | Failed |
-|{numMessages:>11} |{messagesSent-failedToSendCount-1:>11} |{failedToSendCount:>11} |
+|{numMessagesToSend:>11} |{messagesSent-failedToSendCount-1:>11} |{failedToSendCount:>11} |
*----------------------------------------*""".fmt()
if report.isErr:
diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim
index a4db160845..2aafd0240b 100644
--- a/apps/liteprotocoltester/liteprotocoltester.nim
+++ b/apps/liteprotocoltester/liteprotocoltester.nim
@@ -19,7 +19,7 @@ import
node/waku_metrics,
waku_api/rest/builder as rest_server_builder,
waku_lightpush/common,
- waku_filter_v2
+ waku_filter_v2,
],
./tester_config,
./lightpush_publisher,
@@ -106,7 +106,7 @@ when isMainModule:
wakuConf.lightpush = false
wakuConf.store = false
- wakuConf.rest = true
+ wakuConf.rest = false
wakuConf.metricsServer = true
wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0")
@@ -118,17 +118,17 @@ when isMainModule:
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
- let restServer = rest_server_builder.startRestServerEsentials(
- nodeHealthMonitor, wakuConf
- ).valueOr:
- error "Starting esential REST server failed.", error = $error
- quit(QuitFailure)
+ # let restServer = rest_server_builder.startRestServerEsentials(
+ # nodeHealthMonitor, wakuConf
+ # ).valueOr:
+ # error "Starting esential REST server failed.", error = $error
+ # quit(QuitFailure)
var wakuApp = Waku.init(wakuConf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
- wakuApp.restServer = restServer
+ # wakuApp.restServer = restServer
nodeHealthMonitor.setNode(wakuApp.node)
@@ -136,11 +136,11 @@ when isMainModule:
error "Starting waku failed", error = error
quit(QuitFailure)
- rest_server_builder.startRestServerProtocolSupport(
- restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
- ).isOkOr:
- error "Starting protocols support REST server failed.", error = $error
- quit(QuitFailure)
+ # rest_server_builder.startRestServerProtocolSupport(
+ # restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
+ # ).isOkOr:
+ # error "Starting protocols support REST server failed.", error = $error
+ # quit(QuitFailure)
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(wakuConf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
diff --git a/apps/liteprotocoltester/run_service_node.sh b/apps/liteprotocoltester/run_service_node.sh
old mode 100644
new mode 100755
index 7a029434d1..b0d22b6f15
--- a/apps/liteprotocoltester/run_service_node.sh
+++ b/apps/liteprotocoltester/run_service_node.sh
@@ -5,6 +5,17 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "Service node IP: ${IP}"
+if [ -n "${PUBSUB}" ]; then
+ PUBSUB=--pubsub-topic="${PUBSUB}"
+else
+ PUBSUB=--pubsub-topic="/waku/2/rs/66/0"
+fi
+
+if [ -n "${CLUSTER_ID}" ]; then
+ CLUSTER_ID=--cluster-id="${CLUSTER_ID}"
+fi
+
+
RETRIES=${RETRIES:=10}
while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do
@@ -42,5 +53,5 @@ exec /usr/bin/wakunode\
--metrics-server=True\
--metrics-server-address=0.0.0.0\
--nat=extip:${IP}\
- --pubsub-topic=/waku/2/rs/66/0\
- --cluster-id=66
+ --pubsub-topic=${PUBSUB}\
+ --cluster-id=${CLUSTER_ID}
diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh
old mode 100644
new mode 100755
index 9e51261e29..6603545fb2
--- a/apps/liteprotocoltester/run_tester_node.sh
+++ b/apps/liteprotocoltester/run_tester_node.sh
@@ -1,6 +1,6 @@
#!/bin/sh
-set -x
+# set -x
if test -f .env; then
echo "Using .env file"
@@ -18,7 +18,14 @@ NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET))
echo "NODE_INDEX $NODE_INDEX"
-FUNCTION=$1
+BINARY_PATH=$1
+
+if [ ! -x "${BINARY_PATH}" ]; then
+ echo "Invalid binary path. Failing"
+ exit 1
+fi
+
+FUNCTION=$2
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
SERVICENAME=lightpush-service
@@ -29,15 +36,24 @@ if [ "${FUNCTION}" = "RECEIVER" ]; then
SERVICENAME=filter-service
fi
+SERIVCE_NODE_ADDR=$3
+if [ -z "${SERIVCE_NODE_ADDR}" ]; then
+ echo "Service node peer_id provided. Failing"
+ exit 1
+fi
+
+if [ "${SERIVCE_NODE_ADDR}" = "waku-sim" ]; then
-RETRIES=${RETRIES:=10}
+ RETRIES=${RETRIES:=10}
-while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
- SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
- echo "Service node not ready, retrying (retries left: ${RETRIES})"
- sleep 1
- RETRIES=$(( $RETRIES - 1 ))
-done
+ while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
+ SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
+ echo "Service node not ready, retrying (retries left: ${RETRIES})"
+ sleep 1
+ RETRIES=$(( $RETRIES - 1 ))
+ done
+
+fi
if [ -z "${SERIVCE_NODE_ADDR}" ]; then
echo "Could not get SERIVCE_NODE_ADDR and none provided. Failing"
@@ -54,6 +70,10 @@ if [ -n "${CONTENT_TOPIC}" ]; then
CONTENT_TOPIC=--content-topic="${CONTENT_TOPIC}"
fi
+if [ -n "${CLUSTER_ID}" ]; then
+ CLUSTER_ID=--cluster-id="${CLUSTER_ID}"
+fi
+
if [ -n "${START_PUBLISHING_AFTER}" ]; then
START_PUBLISHING_AFTER=--start-publishing-after="${START_PUBLISHING_AFTER}"
fi
@@ -67,20 +87,22 @@ if [ -n "${MAX_MESSAGE_SIZE}" ]; then
fi
+echo "Running binary: ${BINARY_PATH}"
echo "Tester node: ${FUNCTION}"
echo "Using service node: ${SERIVCE_NODE_ADDR}"
-exec /usr/bin/liteprotocoltester\
+
+exec "${BINARY_PATH}"\
--log-level=INFO\
--service-node="${SERIVCE_NODE_ADDR}"\
- --cluster-id=66\
--num-messages=${NUM_MESSAGES}\
--delay-messages=${DELAY_MESSAGES}\
- --nat=extip:${IP}\
${PUBSUB}\
${CONTENT_TOPIC}\
+ ${CLUSTER_ID}\
${FUNCTION}\
${START_PUBLISHING_AFTER}\
${MIN_MESSAGE_SIZE}\
${MAX_MESSAGE_SIZE}
+ # --nat=extip:${IP}\
# --config-file=config.toml\
From 7ff2857cded9322006d2228a644efd9982784f66 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Mon, 5 Aug 2024 14:06:48 +0200
Subject: [PATCH 04/12] Fix missing env vars on run_tester_node.sh
---
apps/liteprotocoltester/run_tester_node.sh | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh
index 6603545fb2..573a14dff4 100755
--- a/apps/liteprotocoltester/run_tester_node.sh
+++ b/apps/liteprotocoltester/run_tester_node.sh
@@ -87,6 +87,14 @@ if [ -n "${MAX_MESSAGE_SIZE}" ]; then
fi
+if [ -n "${NUM_MESSAGES}" ]; then
+ NUM_MESSAGES=--num-messages="${NUM_MESSAGES}"
+fi
+
+if [ -n "${DELAY_MESSAGES}" ]; then
+ DELAY_MESSAGES=--delay-messages="${DELAY_MESSAGES}"
+fi
+
echo "Running binary: ${BINARY_PATH}"
echo "Tester node: ${FUNCTION}"
echo "Using service node: ${SERIVCE_NODE_ADDR}"
@@ -95,8 +103,8 @@ echo "Using service node: ${SERIVCE_NODE_ADDR}"
exec "${BINARY_PATH}"\
--log-level=INFO\
--service-node="${SERIVCE_NODE_ADDR}"\
- --num-messages=${NUM_MESSAGES}\
- --delay-messages=${DELAY_MESSAGES}\
+ ${DELAY_MESSAGES}\
+ ${NUM_MESSAGES}\
${PUBSUB}\
${CONTENT_TOPIC}\
${CLUSTER_ID}\
From 2069045d222ecd0a1a1075f5284fd1cc90dcc211 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Tue, 6 Aug 2024 10:57:18 +0200
Subject: [PATCH 05/12] Adjustment on log levels, fix REST initialization
---
apps/liteprotocoltester/.env | 15 +++--
.../diagnose_connections.nim | 2 +-
apps/liteprotocoltester/filter_subscriber.nim | 7 ++-
.../lightpush_publisher.nim | 61 +++++++++++++------
.../liteprotocoltester/liteprotocoltester.nim | 24 ++++----
apps/liteprotocoltester/run_service_node.sh | 6 +-
waku/waku_api/rest/builder.nim | 4 +-
7 files changed, 70 insertions(+), 49 deletions(-)
diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env
index 3ee0ea8e70..3503605f2a 100644
--- a/apps/liteprotocoltester/.env
+++ b/apps/liteprotocoltester/.env
@@ -1,15 +1,14 @@
#NWAKU_IMAGE=
-START_PUBLISHING_AFTER=10 # seconds
+START_PUBLISHING_AFTER=20 # seconds
NUM_MESSAGES=0 # 0 for infinite
-DELAY_MESSAGES=1000 #1s gap between messages
+DELAY_MESSAGES=8000 #1s gap between messages
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
-PUBSUB=/waku/2/rs/16/32
-CONTENT_TOPIC=/tester/1/light-pubsub-example/proto
-CLUSTER_ID=16
-
-#BOOTSTRAP_ENR=
-#SERIVCE_NODE_ADDR=/ip4/8.218.23.76/tcp/30303/p2p/16Uiu2HAmGwcE8v7gmJNEWFtZtojYpPMTHy2jBLL6xRk33qgDxFWX
+PUBSUB=/waku/2/rs/1/4
+#PUBSUB=/waku/2/rs/16/32
+#CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
+CONTENT_TOPIC=/tester/2/light-pubsub-test/twn
+CLUSTER_ID=1
diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim
index e6e965da7e..0f198d63e6 100644
--- a/apps/liteprotocoltester/diagnose_connections.nim
+++ b/apps/liteprotocoltester/diagnose_connections.nim
@@ -92,5 +92,5 @@ proc logServiceRelayPeers(
await sleepAsync(interval)
proc startPeriodicPeerDiagnostic*(pm: PeerManager, codec: string) {.async.} =
- asyncSpawn logSelfPeersLoop(pm, chronos.seconds(20))
+ asyncSpawn logSelfPeersLoop(pm, chronos.seconds(60))
# asyncSpawn logServiceRelayPeers(pm, codec, chronos.seconds(20))
diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim
index 1ee2bdcca3..72b054f588 100644
--- a/apps/liteprotocoltester/filter_subscriber.nim
+++ b/apps/liteprotocoltester/filter_subscriber.nim
@@ -45,6 +45,7 @@ proc maintainSubscription(
let pingRes = await wakuNode.wakuFilterClient.ping(filterPeer)
if pingRes.isErr():
# No subscription found. Let's subscribe.
+ error "ping failed.", err = pingRes.error
trace "no subscription found. Sending subscribe request"
let subscribeRes = await wakuNode.filterSubscribe(
@@ -52,10 +53,10 @@ proc maintainSubscription(
)
if subscribeRes.isErr():
- trace "subscribe request failed. Quitting.", err = subscribeRes.error
+ error "subscribe request failed. Quitting.", err = subscribeRes.error
break
else:
- trace "subscribe request successful."
+ notice "subscribe request successful."
else:
trace "subscription found."
@@ -102,7 +103,7 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
proc(udata: pointer) {.gcsafe.} =
stats.echoStats()
- if waitFor stats.checkIfAllMessagesReceived():
+ if conf.numMessages > 0 and waitFor stats.checkIfAllMessagesReceived():
waitFor unsubscribe(
wakuNode, remotePeer, conf.pubsubTopics[0], conf.contentTopics[0]
)
diff --git a/apps/liteprotocoltester/lightpush_publisher.nim b/apps/liteprotocoltester/lightpush_publisher.nim
index ac437995c5..aa56bccf18 100644
--- a/apps/liteprotocoltester/lightpush_publisher.nim
+++ b/apps/liteprotocoltester/lightpush_publisher.nim
@@ -3,6 +3,7 @@ import
system/ansi_c,
chronicles,
chronos,
+ chronos/timer as chtimer,
stew/byteutils,
results,
json_serialization as js
@@ -67,6 +68,38 @@ proc prepareMessage(
return (message, renderSize)
var sentMessages {.threadvar.}: OrderedTable[uint32, tuple[hash: string, relayed: bool]]
+var failedToSendCause {.threadvar.}: Table[string, uint32]
+var failedToSendCount {.threadvar.}: uint32
+var numMessagesToSend {.threadvar.}: uint32
+var messagesSent {.threadvar.}: uint32
+
+proc reportSentMessages() {.async.} =
+ while true:
+ await sleepAsync(chtimer.seconds(60))
+ let report = catch:
+ """*----------------------------------------*
+| Expected | Sent | Failed |
+|{numMessagesToSend+failedToSendCount:>11} |{messagesSent:>11} |{failedToSendCount:>11} |
+*----------------------------------------*""".fmt()
+
+ if report.isErr:
+ echo "Error while printing statistics"
+ else:
+ echo report.get()
+
+ echo "*--------------------------------------------------------------------------------------------------*"
+ echo "| Failur cause | count |"
+ for (cause, count) in failedToSendCause.pairs:
+ echo fmt"|{cause:<87}|{count:>10}|"
+ echo "*--------------------------------------------------------------------------------------------------*"
+
+ echo "*--------------------------------------------------------------------------------------------------*"
+ echo "| Index | Relayed | Hash |"
+ for (index, info) in sentMessages.pairs:
+ echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash:<76}|"
+ echo "*--------------------------------------------------------------------------------------------------*"
+ # evere sent message hash should logged once
+ sentMessages.clear()
proc publishMessages(
wakuNode: WakuNode,
@@ -78,7 +111,6 @@ proc publishMessages(
) {.async.} =
let startedAt = getNowInNanosecondTime()
var prevMessageAt = startedAt
- var failedToSendCount: uint32 = 0
var renderMsgSize = messageSizeRange
# sets some default of min max message size to avoid conflict with meaningful payload size
renderMsgSize.min = max(1024.uint64, renderMsgSize.min) # do not use less than 1KB
@@ -87,9 +119,10 @@ proc publishMessages(
renderMsgSize.max = max(renderMsgSize.min, renderMsgSize.max)
let selfPeerId = $wakuNode.switch.peerInfo.peerId
- var numMessagesToSend = if numMessages == 0: uint32.high else: numMessages
+ failedToSendCount = 0
+ numMessagesToSend = if numMessages == 0: uint32.high else: numMessages
+ messagesSent = 1
- var messagesSent: uint32 = 1
while numMessagesToSend >= messagesSent:
let (message, msgSize) = prepareMessage(
selfPeerId, messagesSent, numMessagesToSend, startedAt, prevMessageAt,
@@ -107,31 +140,17 @@ proc publishMessages(
size = msgSize,
pubsubTopic = lightpushPubsubTopic,
hash = msgHash
+ inc(messagesSent)
else:
sentMessages[messagesSent] = (hash: msgHash, relayed: false)
+ failedToSendCause.mgetOrPut(wlpRes.error, 1).inc()
error "failed to publish message using lightpush",
err = wlpRes.error, hash = msgHash
inc(failedToSendCount)
await sleepAsync(delayMessages)
- inc(messagesSent)
- let report = catch:
- """*----------------------------------------*
-| Expected | Sent | Failed |
-|{numMessagesToSend:>11} |{messagesSent-failedToSendCount-1:>11} |{failedToSendCount:>11} |
-*----------------------------------------*""".fmt()
-
- if report.isErr:
- echo "Error while printing statistics"
- else:
- echo report.get()
-
- echo "*--------------------------------------------------------------------------------------------------*"
- echo "| Index | Relayed | Hash |"
- for (index, info) in sentMessages.pairs:
- echo fmt"|{index:>10}|{info.relayed:<9}| {info.hash}"
- echo "*--------------------------------------------------------------------------------------------------*"
+ waitFor reportSentMessages()
discard c_raise(ansi_c.SIGTERM)
@@ -166,3 +185,5 @@ proc setupAndPublish*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
(min: parsedMinMsgSize, max: parsedMaxMsgSize),
conf.delayMessages.milliseconds,
)
+
+ asyncSpawn reportSentMessages()
diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim
index 2aafd0240b..d7285e2f2e 100644
--- a/apps/liteprotocoltester/liteprotocoltester.nim
+++ b/apps/liteprotocoltester/liteprotocoltester.nim
@@ -84,7 +84,7 @@ when isMainModule:
wakuConf.logLevel = conf.logLevel
wakuConf.logFormat = conf.logFormat
- wakuConf.staticNodes = @[conf.serviceNode]
+ wakuConf.staticnodes = @[conf.serviceNode]
wakuConf.nat = conf.nat
wakuConf.maxConnections = 500
wakuConf.restAddress = conf.restAddress
@@ -118,17 +118,17 @@ when isMainModule:
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
- # let restServer = rest_server_builder.startRestServerEsentials(
- # nodeHealthMonitor, wakuConf
- # ).valueOr:
- # error "Starting esential REST server failed.", error = $error
- # quit(QuitFailure)
+ let restServer = rest_server_builder.startRestServerEsentials(
+ nodeHealthMonitor, wakuConf
+ ).valueOr:
+ error "Starting esential REST server failed.", error = $error
+ quit(QuitFailure)
var wakuApp = Waku.init(wakuConf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
- # wakuApp.restServer = restServer
+ wakuApp.restServer = restServer
nodeHealthMonitor.setNode(wakuApp.node)
@@ -136,11 +136,11 @@ when isMainModule:
error "Starting waku failed", error = error
quit(QuitFailure)
- # rest_server_builder.startRestServerProtocolSupport(
- # restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
- # ).isOkOr:
- # error "Starting protocols support REST server failed.", error = $error
- # quit(QuitFailure)
+ rest_server_builder.startRestServerProtocolSupport(
+ restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
+ ).isOkOr:
+ error "Starting protocols support REST server failed.", error = $error
+ quit(QuitFailure)
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(wakuConf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
diff --git a/apps/liteprotocoltester/run_service_node.sh b/apps/liteprotocoltester/run_service_node.sh
index b0d22b6f15..8397bb6113 100755
--- a/apps/liteprotocoltester/run_service_node.sh
+++ b/apps/liteprotocoltester/run_service_node.sh
@@ -52,6 +52,6 @@ exec /usr/bin/wakunode\
--log-level=INFO\
--metrics-server=True\
--metrics-server-address=0.0.0.0\
- --nat=extip:${IP}\
- --pubsub-topic=${PUBSUB}\
- --cluster-id=${CLUSTER_ID}
+ ${PUBSUB}\
+ ${CLUSTER_ID}
+ # --nat=extip:${IP}\
diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim
index ebf1c7f963..811e6cab05 100644
--- a/waku/waku_api/rest/builder.nim
+++ b/waku/waku_api/rest/builder.nim
@@ -33,7 +33,7 @@ proc startRestServerEsentials*(
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
): Result[WakuRestServerRef, string] =
if not conf.rest:
- return
+ return ok(nil)
let requestErrorHandler: RestRequestErrorHandler = proc(
error: RestRequestError, request: HttpRequestRef
@@ -113,7 +113,7 @@ proc startRestServerProtocolSupport*(
conf: WakuNodeConf,
): Result[void, string] =
if not conf.rest:
- return
+ return ok()
var router = restServer.router
## Admin REST API
From 69b3f520a1125b14181eb8fd438165fb28e28db9 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Wed, 7 Aug 2024 01:26:55 +0200
Subject: [PATCH 06/12] Added standalon docker image build, fine tune duplicate
detection and logging.
---
apps/liteprotocoltester/.env | 12 +++--
.../Dockerfile.liteprotocoltester | 36 +++++++++++++++
.../Dockerfile.liteprotocoltester.compile | 2 +-
.../Dockerfile.liteprotocoltester.copy | 2 +-
apps/liteprotocoltester/filter_subscriber.nim | 5 +--
apps/liteprotocoltester/run_tester_node.sh | 16 +++----
apps/liteprotocoltester/statistics.nim | 45 ++++++++++++++++---
7 files changed, 94 insertions(+), 24 deletions(-)
create mode 100644 apps/liteprotocoltester/Dockerfile.liteprotocoltester
diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env
index 3503605f2a..95487f87c3 100644
--- a/apps/liteprotocoltester/.env
+++ b/apps/liteprotocoltester/.env
@@ -1,8 +1,12 @@
-#NWAKU_IMAGE=
+START_PUBLISHING_AFTER=10
+# can add some seconds delay before SENDER starts publishing
+
+NUM_MESSAGES=0
+# 0 for infinite number of messages
+
+DELAY_MESSAGES=8000
+# ms delay between messages
-START_PUBLISHING_AFTER=20 # seconds
-NUM_MESSAGES=0 # 0 for infinite
-DELAY_MESSAGES=8000 #1s gap between messages
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester b/apps/liteprotocoltester/Dockerfile.liteprotocoltester
new file mode 100644
index 0000000000..11e37078d0
--- /dev/null
+++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester
@@ -0,0 +1,36 @@
+ # TESTING IMAGE --------------------------------------------------------------
+
+ ## NOTICE: This is a short cut build file for ubuntu users who compiles nwaku in ubuntu distro.
+ ## This is used for faster turnaround time for testing the compiled binary.
+ ## Prerequisites: compiled liteprotocoltester binary in build/ directory
+
+ FROM ubuntu:noble AS prod
+
+ LABEL maintainer="zoltan@status.im"
+ LABEL source="https://github.com/waku-org/nwaku"
+ LABEL description="Lite Protocol Tester: Waku light-client"
+ LABEL commit="unknown"
+ LABEL version="unknown"
+
+ # DevP2P, LibP2P, and JSON RPC ports
+ EXPOSE 30303 60000 8545
+
+ # Referenced in the binary
+ RUN apt-get update && apt-get install -y --no-install-recommends \
+ libgcc1 \
+ libpcre3 \
+ libpq-dev \
+ wget \
+ iproute2 \
+ && rm -rf /var/lib/apt/lists/*
+
+ # Fix for 'Error loading shared library libpcre.so.3: No such file or directory'
+ RUN ln -s /usr/lib/libpcre.so /usr/lib/libpcre.so.3
+
+ COPY build/liteprotocoltester /usr/bin/
+ COPY apps/liteprotocoltester/run_tester_node.sh /usr/bin/
+
+ ENTRYPOINT ["/usr/bin/run_tester_node.sh", "/usr/bin/liteprotocoltester"]
+
+ # # By default just show help if called without arguments
+ CMD ["--help"]
diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile
index 5ad5f83852..6e3184c9ac 100644
--- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile
+++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.compile
@@ -27,7 +27,7 @@
# PRODUCTION IMAGE -------------------------------------------------------------
- FROM alpine:3.18 as prod
+ FROM alpine:3.18 AS prod
ARG MAKE_TARGET=liteprotocoltester
diff --git a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
index 02ef7ad503..bfff8bae70 100644
--- a/apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
+++ b/apps/liteprotocoltester/Dockerfile.liteprotocoltester.copy
@@ -4,7 +4,7 @@
## This is used for faster turnaround time for testing the compiled binary.
## Prerequisites: compiled liteprotocoltester binary in build/ directory
- FROM ubuntu:noble as prod
+ FROM ubuntu:noble AS prod
LABEL maintainer="jakub@status.im"
LABEL source="https://github.com/waku-org/nwaku"
diff --git a/apps/liteprotocoltester/filter_subscriber.nim b/apps/liteprotocoltester/filter_subscriber.nim
index 72b054f588..dca8eb880b 100644
--- a/apps/liteprotocoltester/filter_subscriber.nim
+++ b/apps/liteprotocoltester/filter_subscriber.nim
@@ -79,11 +79,10 @@ proc setupAndSubscribe*(wakuNode: WakuNode, conf: LiteProtocolTesterConf) =
let pushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
let payloadStr = string.fromBytes(message.payload)
let testerMessage = js.Json.decode(payloadStr, ProtocolTesterMessage)
-
- stats.addMessage(testerMessage.sender, testerMessage)
-
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex
+ stats.addMessage(testerMessage.sender, testerMessage, msgHash)
+
notice "message received",
index = testerMessage.index,
count = testerMessage.count,
diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh
index 573a14dff4..4ef911d1a9 100755
--- a/apps/liteprotocoltester/run_tester_node.sh
+++ b/apps/liteprotocoltester/run_tester_node.sh
@@ -11,20 +11,19 @@ IP=$(ip a | grep "inet " | grep -Fv 127.0.0.1 | sed 's/.*inet \([^/]*\).*/\1/')
echo "I am a lite-protocol-tester node"
-# Get an unique node index based on the container's IP
-FOURTH_OCTET=${IP##*.}
-THIRD_OCTET="${IP%.*}"; THIRD_OCTET="${THIRD_OCTET##*.}"
-NODE_INDEX=$((FOURTH_OCTET + 256 * THIRD_OCTET))
-
-echo "NODE_INDEX $NODE_INDEX"
-
BINARY_PATH=$1
if [ ! -x "${BINARY_PATH}" ]; then
- echo "Invalid binary path. Failing"
+ echo "Invalid binary path '${BINARY_PATH}'. Failing"
exit 1
fi
+if [ "${2}" = "--help" ]; then
+ echo "You might want to check nwaku/apps/liteprotocoltester/README.md"
+ exec "${BINARY_PATH}" --help
+ exit 0
+fi
+
FUNCTION=$2
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
@@ -99,7 +98,6 @@ echo "Running binary: ${BINARY_PATH}"
echo "Tester node: ${FUNCTION}"
echo "Using service node: ${SERIVCE_NODE_ADDR}"
-
exec "${BINARY_PATH}"\
--log-level=INFO\
--service-node="${SERIVCE_NODE_ADDR}"\
diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim
index cb98be3686..5964f10463 100644
--- a/apps/liteprotocoltester/statistics.nim
+++ b/apps/liteprotocoltester/statistics.nim
@@ -16,6 +16,7 @@ type
prevIndex: uint32
MessageInfo = tuple[msg: ProtocolTesterMessage, info: ArrivalInfo]
+ DupStat = tuple[hash: string, dupCount: int, size: uint64]
StatHelper = object
prevIndex: uint32
@@ -23,6 +24,7 @@ type
lostIndices: HashSet[uint32]
seenIndices: HashSet[uint32]
maxIndex: uint32
+ duplicates: OrderedTable[uint32, DupStat]
Statistics* = object
received: Table[uint32, MessageInfo]
@@ -52,7 +54,7 @@ proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
result.received = initTable[uint32, MessageInfo](expectedMessageCount)
return result
-proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
+proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage, msgHash: string) =
if self.allMessageCount == 0:
self.allMessageCount = msg.count
self.firstReceivedIdx = msg.index
@@ -70,8 +72,12 @@ proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
)
if self.received.hasKeyOrPut(msg.index, currentArrived):
- warn "Duplicate message", index = msg.index
inc(self.duplicateCount)
+ self.helper.duplicates.mgetOrPut(msg.index, (msgHash, 0, msg.size)).dupCount.inc()
+ warn "Duplicate message",
+ index = msg.index,
+ hash = msgHash,
+ times_duplicated = self.helper.duplicates[msg.index].dupCount
return
## detect misorder arrival and possible lost messages
@@ -89,13 +95,16 @@ proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
inc(self.receivedMessages)
proc addMessage*(
- self: var PerPeerStatistics, peerId: string, msg: ProtocolTesterMessage
+ self: var PerPeerStatistics,
+ peerId: string,
+ msg: ProtocolTesterMessage,
+ msgHash: string,
) =
if not self.contains(peerId):
self[peerId] = Statistics.init()
discard catch:
- self[peerId].addMessage(msg)
+ self[peerId].addMessage(msg, msgHash)
proc lossCount*(self: Statistics): uint32 =
self.helper.maxIndex - self.receivedMessages
@@ -140,19 +149,43 @@ proc missingIndices*(self: Statistics): seq[uint32] =
missing.add(idx)
return missing
+proc distinctDupCount(self: Statistics): int {.inline.} =
+ return self.helper.duplicates.len()
+
+proc allDuplicates(self: Statistics): int {.inline.} =
+ var total = 0
+ for _, (_, dupCount, _) in self.helper.duplicates.pairs:
+ total += dupCount
+ return total
+
+proc dupMsgs(self: Statistics): string =
+ var dupMsgs: string = ""
+ for idx, (hash, dupCount, size) in self.helper.duplicates.pairs:
+ dupMsgs.add(
+ " index: " & $idx & " | hash: " & hash & " | count: " & $dupCount & " | size: " &
+ $size & "\n"
+ )
+ return dupMsgs
+
proc echoStat*(self: Statistics) =
let (minL, maxL, avgL) = self.calcLatency()
let printable = catch:
"""*------------------------------------------------------------------------------------------*
-| Expected | Received | Target | Loss | Misorder | Late | Duplicate |
-|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} |
+| Expected | Received | Target | Loss | Misorder | Late | |
+|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} | |
*------------------------------------------------------------------------------------------*
| Latency stat: |
| min latency: {$minL:<73}|
| avg latency: {$avgL:<73}|
| max latency: {$maxL:<73}|
*------------------------------------------------------------------------------------------*
+| Duplicate stat: |
+| distinct duplicate messages: {$self.distinctDupCount():<57}|
+| sum duplicates : {$self.allDuplicates():<57}|
+ Duplicated messages:
+ {self.dupMsgs()}
+*------------------------------------------------------------------------------------------*
| Lost indices: |
| {self.missingIndices()} |
*------------------------------------------------------------------------------------------*""".fmt()
From 0831f8cec4b0ee833c88556a9f36a8d789e7ca45 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Wed, 7 Aug 2024 02:41:58 +0200
Subject: [PATCH 07/12] Adjustments for waku-simulator runs
---
apps/liteprotocoltester/.env | 15 ++++++++++++---
.../docker-compose-on-simularor.yml | 11 +++++++----
apps/liteprotocoltester/run_tester_node.sh | 4 ++--
3 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/apps/liteprotocoltester/.env b/apps/liteprotocoltester/.env
index 95487f87c3..b389cc3cfc 100644
--- a/apps/liteprotocoltester/.env
+++ b/apps/liteprotocoltester/.env
@@ -11,8 +11,17 @@ DELAY_MESSAGES=8000
MIN_MESSAGE_SIZE=15Kb
MAX_MESSAGE_SIZE=145Kb
-PUBSUB=/waku/2/rs/1/4
+## for wakusim
+PUBSUB=/waku/2/rs/66/0
+CONTENT_TOPIC=/tester/2/light-pubsub-test/wakusim
+CLUSTER_ID=66
+
+## for status.prod
#PUBSUB=/waku/2/rs/16/32
#CONTENT_TOPIC=/tester/2/light-pubsub-test/fleet
-CONTENT_TOPIC=/tester/2/light-pubsub-test/twn
-CLUSTER_ID=1
+#CLUSTER_ID=16
+
+## for TWN
+#PUBSUB=/waku/2/rs/1/4
+#CONTENT_TOPIC=/tester/2/light-pubsub-test/twn
+#CLUSTER_ID=1
diff --git a/apps/liteprotocoltester/docker-compose-on-simularor.yml b/apps/liteprotocoltester/docker-compose-on-simularor.yml
index 58191d1302..24145801fe 100644
--- a/apps/liteprotocoltester/docker-compose-on-simularor.yml
+++ b/apps/liteprotocoltester/docker-compose-on-simularor.yml
@@ -16,8 +16,9 @@ x-rln-environment: &rln_env
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
- PUBSUB: ${PUBSUB:-}
- CONTENT_TOPIC: ${CONTENT_TOPIC:-}
+ PUBSUB: ${PUBSUB:-/waku/2/rs/66/0}
+ CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim}
+ CLUSTER_ID: ${CLUSTER_ID:-66}
MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}
MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb}
START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds
@@ -26,7 +27,7 @@ x-test-running-conditions: &test_running_conditions
# Services definitions
services:
lightpush-service:
- image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
+ image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest-release}
# ports:
# - 30304:30304/tcp
# - 30304:30304/udp
@@ -44,6 +45,7 @@ services:
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
+ - *test_running_conditions
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
@@ -101,7 +103,7 @@ services:
- waku-simulator_simulation
filter-service:
- image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
+ image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest-release}
# ports:
# - 30304:30305/tcp
# - 30304:30305/udp
@@ -119,6 +121,7 @@ services:
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
+ - *test_running_conditions
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh
index 4ef911d1a9..daa621490b 100755
--- a/apps/liteprotocoltester/run_tester_node.sh
+++ b/apps/liteprotocoltester/run_tester_node.sh
@@ -1,6 +1,6 @@
#!/bin/sh
-# set -x
+#set -x
if test -f .env; then
echo "Using .env file"
@@ -42,7 +42,7 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then
fi
if [ "${SERIVCE_NODE_ADDR}" = "waku-sim" ]; then
-
+ SERIVCE_NODE_ADDR=""
RETRIES=${RETRIES:=10}
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
From 0b1892bfd480bc2cf9bd55e0b9ba94059decb829 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Wed, 7 Aug 2024 02:53:10 +0200
Subject: [PATCH 08/12] Extended liteprotocoltester README.md with docker build
---
apps/liteprotocoltester/README.md | 28 ++++++++++++++++++++++++++++
1 file changed, 28 insertions(+)
diff --git a/apps/liteprotocoltester/README.md b/apps/liteprotocoltester/README.md
index 8c6a1f1236..9aef6824c0 100644
--- a/apps/liteprotocoltester/README.md
+++ b/apps/liteprotocoltester/README.md
@@ -174,6 +174,34 @@ Run a SENDER role liteprotocoltester and a RECEIVER role one on different termin
### Docker image notice
+#### Building for docker compose runs on simulator or standalone
Please note that currently to ease testing and development tester application docker image is based on ubuntu and uses the externally pre-built binary of 'liteprotocoltester'.
This speeds up image creation. Another dokcer build file is provided for proper build of boundle image.
+> `Dockerfile.liteprotocoltester.copy` will create an image with the binary copied from the build directory.
+
+> `Dockerfile.liteprotocoltester.compile` will create an image completely compiled from source. This can be quite slow.
+
+#### Creating standalone runner docker image
+
+To ease the work with lite-proto-tester, a docker image is possible to build.
+With that image it is easy to run the application in a container.
+
+> `Dockerfile.liteprotocoltester` will create an ubuntu image with the binary copied from the build directory. You need to pre-build the application.
+
+Here is how to build and run:
+```bash
+cd
+make liteprotocoltester
+
+cd apps/liteprotocoltester
+docker build -t liteprotocoltester:latest -f Dockerfile.liteprotocoltester ../..
+
+# alternatively you can push it to a registry
+
+# edit and adjust .env file to your needs and for the network configuration
+
+docker run --env-file .env liteprotocoltester:latest RECEIVER
+
+docker run --env-file .env liteprotocoltester:latest SENDER
+```
From d2afb81aa8ded2bcdd6af0fe07b192f104be1bec Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Tue, 20 Aug 2024 03:37:42 +0200
Subject: [PATCH 09/12] Fix test inside docker service node connectivity
failure
---
apps/liteprotocoltester/docker-compose.yml | 14 ++++++++--
apps/liteprotocoltester/run_service_node.sh | 31 ++++++++++++---------
apps/liteprotocoltester/run_tester_node.sh | 15 ++++++++++
3 files changed, 44 insertions(+), 16 deletions(-)
diff --git a/apps/liteprotocoltester/docker-compose.yml b/apps/liteprotocoltester/docker-compose.yml
index 14b1e80065..c13bdd973e 100644
--- a/apps/liteprotocoltester/docker-compose.yml
+++ b/apps/liteprotocoltester/docker-compose.yml
@@ -16,13 +16,18 @@ x-rln-environment: &rln_env
x-test-running-conditions: &test_running_conditions
NUM_MESSAGES: ${NUM_MESSAGES:-120}
DELAY_MESSAGES: "${DELAY_MESSAGES:-1000}"
- PUBSUB: ${PUBSUB:-}
- CONTENT_TOPIC: ${CONTENT_TOPIC:-}
+ PUBSUB: ${PUBSUB:-/waku/2/rs/66/0}
+ CONTENT_TOPIC: ${CONTENT_TOPIC:-/tester/2/light-pubsub-test/wakusim}
+ CLUSTER_ID: ${CLUSTER_ID:-66}
+ MIN_MESSAGE_SIZE: ${MIN_MESSAGE_SIZE:-1Kb}
+ MAX_MESSAGE_SIZE: ${MAX_MESSAGE_SIZE:-150Kb}
+ START_PUBLISHING_AFTER: ${START_PUBLISHING_AFTER:-5} # seconds
+ STANDALONE: ${STANDALONE:-1}
# Services definitions
services:
servicenode:
- image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest}
+ image: ${NWAKU_IMAGE:-harbor.status.im/wakuorg/nwaku:latest-release}
ports:
- 30304:30304/tcp
- 30304:30304/udp
@@ -40,6 +45,7 @@ services:
EXTRA_ARGS: ${EXTRA_ARGS}
<<:
- *rln_env
+ - *test_running_conditions
volumes:
- ./run_service_node.sh:/opt/run_service_node.sh:Z
- ${CERTS_DIR:-./certs}:/etc/letsencrypt/:Z
@@ -82,6 +88,7 @@ services:
- /opt/run_tester_node.sh
- /usr/bin/liteprotocoltester
- SENDER
+ - servicenode
depends_on:
- servicenode
configs:
@@ -121,6 +128,7 @@ services:
- /opt/run_tester_node.sh
- /usr/bin/liteprotocoltester
- RECEIVER
+ - servicenode
depends_on:
- servicenode
- publishernode
diff --git a/apps/liteprotocoltester/run_service_node.sh b/apps/liteprotocoltester/run_service_node.sh
index 8397bb6113..fdbbf6cdf3 100755
--- a/apps/liteprotocoltester/run_service_node.sh
+++ b/apps/liteprotocoltester/run_service_node.sh
@@ -15,22 +15,27 @@ if [ -n "${CLUSTER_ID}" ]; then
CLUSTER_ID=--cluster-id="${CLUSTER_ID}"
fi
+echo "STANDALONE: ${STANDALONE}"
-RETRIES=${RETRIES:=10}
+if [ -z "${STANDALONE}" ]; then
-while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do
- BOOTSTRAP_ENR=$(wget -qO- http://bootstrap:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"enrUri":"\([^"]*\)".*/\1/');
- echo "Bootstrap node not ready, retrying (retries left: ${RETRIES})"
- sleep 1
- RETRIES=$(( $RETRIES - 1 ))
-done
+ RETRIES=${RETRIES:=10}
-if [ -z "${BOOTSTRAP_ENR}" ]; then
- echo "Could not get BOOTSTRAP_ENR and none provided. Failing"
- exit 1
-fi
+ while [ -z "${BOOTSTRAP_ENR}" ] && [ ${RETRIES} -ge 0 ]; do
+ BOOTSTRAP_ENR=$(wget -qO- http://bootstrap:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"enrUri":"\([^"]*\)".*/\1/');
+ echo "Bootstrap node not ready, retrying (retries left: ${RETRIES})"
+ sleep 1
+ RETRIES=$(( $RETRIES - 1 ))
+ done
+
+ if [ -z "${BOOTSTRAP_ENR}" ]; then
+ echo "Could not get BOOTSTRAP_ENR and none provided. Failing"
+ exit 1
+ fi
-echo "Using bootstrap node: ${BOOTSTRAP_ENR}"
+ echo "Using bootstrap node: ${BOOTSTRAP_ENR}"
+
+fi
exec /usr/bin/wakunode\
@@ -52,6 +57,6 @@ exec /usr/bin/wakunode\
--log-level=INFO\
--metrics-server=True\
--metrics-server-address=0.0.0.0\
+ --nat=extip:${IP}\
${PUBSUB}\
${CLUSTER_ID}
- # --nat=extip:${IP}\
diff --git a/apps/liteprotocoltester/run_tester_node.sh b/apps/liteprotocoltester/run_tester_node.sh
index daa621490b..c5a5355c6d 100755
--- a/apps/liteprotocoltester/run_tester_node.sh
+++ b/apps/liteprotocoltester/run_tester_node.sh
@@ -41,11 +41,26 @@ if [ -z "${SERIVCE_NODE_ADDR}" ]; then
exit 1
fi
+DO_DETECT_SERVICENODE=0
+
+if [ "${SERIVCE_NODE_ADDR}" = "servicenode" ]; then
+ DO_DETECT_SERVICENODE=1
+ SERIVCE_NODE_ADDR=""
+ SERVICENAME=servicenode
+fi
+
if [ "${SERIVCE_NODE_ADDR}" = "waku-sim" ]; then
+ DO_DETECT_SERVICENODE=1
SERIVCE_NODE_ADDR=""
+fi
+
+if [ $DO_DETECT_SERVICENODE -eq 1 ]; then
RETRIES=${RETRIES:=10}
while [ -z "${SERIVCE_NODE_ADDR}" ] && [ ${RETRIES} -ge 0 ]; do
+ SERVICE_DEBUG_INFO=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null);
+ echo "SERVICE_DEBUG_INFO: ${SERVICE_DEBUG_INFO}"
+
SERIVCE_NODE_ADDR=$(wget -qO- http://${SERVICENAME}:8645/debug/v1/info --header='Content-Type:application/json' 2> /dev/null | sed 's/.*"listenAddresses":\["\([^"]*\)".*/\1/');
echo "Service node not ready, retrying (retries left: ${RETRIES})"
sleep 1
From c6b201526b280ec7887b952715eb14693152dadb Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Wed, 21 Aug 2024 11:37:06 +0200
Subject: [PATCH 10/12] Update apps/liteprotocoltester/README.md
Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
---
apps/liteprotocoltester/README.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/apps/liteprotocoltester/README.md b/apps/liteprotocoltester/README.md
index 9aef6824c0..719616d630 100644
--- a/apps/liteprotocoltester/README.md
+++ b/apps/liteprotocoltester/README.md
@@ -55,7 +55,7 @@ docker compose logs -f receivernode
> Integration with waku-simulator!
-- For convenient integration is done in cooperation with waku-simulator repository, but nothing is tightly coupled.
+- For convenience, integration is done in cooperation with waku-simulator repository, but nothing is tightly coupled.
- waku-simulator must be started separately with its own configuration.
- To enable waku-simulator working without RLN currently a separate branch is to be used.
- When waku-simulator is configured and up and running, lite-protocol-tester composite docker setup can be started.
From b388bf54ab597c546a657082363dcf1f3b5a7b94 Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Wed, 21 Aug 2024 11:47:23 +0200
Subject: [PATCH 11/12] Apply suggestions from code review
Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
---
apps/liteprotocoltester/diagnose_connections.nim | 2 +-
apps/liteprotocoltester/statistics.nim | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/apps/liteprotocoltester/diagnose_connections.nim b/apps/liteprotocoltester/diagnose_connections.nim
index 0f198d63e6..d5b2cca0ba 100644
--- a/apps/liteprotocoltester/diagnose_connections.nim
+++ b/apps/liteprotocoltester/diagnose_connections.nim
@@ -35,7 +35,7 @@ logScope:
topics = "diagnose connections"
proc logSelfPeersLoop(pm: PeerManager, interval: Duration) {.async.} =
- trace "Starting logSelfPeersLoop diagnosys loop"
+ trace "Starting logSelfPeersLoop diagnosis loop"
while true:
let selfLighpushPeers = pm.peerStore.getPeersByProtocol(WakuLightPushCodec)
let selfRelayPeers = pm.peerStore.getPeersByProtocol(WakuRelayCodec)
diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim
index 5964f10463..634622e193 100644
--- a/apps/liteprotocoltester/statistics.nim
+++ b/apps/liteprotocoltester/statistics.nim
@@ -138,7 +138,7 @@ proc calcLatency*(self: Statistics): tuple[min, max, avg: Duration] =
avgLatency = avgLatency div (self.receivedMessages - 1)
except KeyError:
- error "Error while calculating latency"
+ error "Error while calculating latency: " & getCurrentExceptionMsg()
return (minLatency, maxLatency, avgLatency)
From 6be5d95c346b13f00d86198084921211c340dcda Mon Sep 17 00:00:00 2001
From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Date: Wed, 21 Aug 2024 11:52:26 +0200
Subject: [PATCH 12/12] Explain minLatency calculation in code comment
---
apps/liteprotocoltester/statistics.nim | 2 ++
1 file changed, 2 insertions(+)
diff --git a/apps/liteprotocoltester/statistics.nim b/apps/liteprotocoltester/statistics.nim
index 634622e193..1397266c53 100644
--- a/apps/liteprotocoltester/statistics.nim
+++ b/apps/liteprotocoltester/statistics.nim
@@ -127,6 +127,8 @@ proc calcLatency*(self: Statistics): tuple[min, max, avg: Duration] =
## latency will be 0 if arrived in shorter time than expected
var latency = arrival.arrivedAt - arrival.prevArrivedAt - expectedDelay
+ ## will not measure zero latency, it is unlikely to happen but in case happens could
+ ## ditort the min latency calulculation as we want to calculate the feasible minimum.
if latency > nanos(0):
if minLatency == nanos(0):
minLatency = latency